diff options
Diffstat (limited to '')
-rw-r--r-- | runtime/queue.c | 3609 |
1 files changed, 3609 insertions, 0 deletions
diff --git a/runtime/queue.c b/runtime/queue.c new file mode 100644 index 0000000..edc9d35 --- /dev/null +++ b/runtime/queue.c @@ -0,0 +1,3609 @@ +/* queue.c + * + * This file implements the queue object and its several queueing methods. + * + * File begun on 2008-01-03 by RGerhards + * + * There is some in-depth documentation available in doc/dev_queue.html + * (and in the web doc set on https://www.rsyslog.com/doc/). Be sure to read it + * if you are getting aquainted to the object. + * + * NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static + * function names - this makes it really hard to read and does not provide much + * benefit, at least I (now) think so... + * + * Copyright 2008-2019 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ +#include "config.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <pthread.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/stat.h> /* required for HP UX */ +#include <time.h> +#include <errno.h> +#include <inttypes.h> + +#include "rsyslog.h" +#include "queue.h" +#include "stringbuf.h" +#include "srUtils.h" +#include "obj.h" +#include "wtp.h" +#include "wti.h" +#include "msg.h" +#include "obj.h" +#include "atomic.h" +#include "errmsg.h" +#include "datetime.h" +#include "unicode-helper.h" +#include "statsobj.h" +#include "parserif.h" +#include "rsconf.h" + +#ifdef OS_SOLARIS +# include <sched.h> +#endif + +/* static data */ +DEFobjStaticHelpers +DEFobjCurrIf(glbl) +DEFobjCurrIf(strm) +DEFobjCurrIf(datetime) +DEFobjCurrIf(statsobj) + +#if __GNUC__ >= 8 +#pragma GCC diagnostic ignored "-Wcast-function-type" // TODO: investigate further! +#endif /* if __GNUC__ >= 8 */ + +#ifdef ENABLE_IMDIAG +unsigned int iOverallQueueSize = 0; +#endif + +#define OVERSIZE_QUEUE_WATERMARK 500000 /* when is a queue considered to be "overly large"? */ + + +/* forward-definitions */ +static rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, smsg_t *pMsg); +static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); +static rsRetVal RateLimiter(qqueue_t *pThis); +static rsRetVal qqueueChkStopWrkrDA(qqueue_t *pThis); +static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); +static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); +static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); +static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qAddDirect(qqueue_t *pThis, smsg_t *pMsg); +static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); +static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); +static rsRetVal qDestructDisk(qqueue_t *pThis); +rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir); + +/* some constants for queuePersist () */ +#define QUEUE_CHECKPOINT 1 +#define QUEUE_NO_CHECKPOINT 0 + +/* tables for interfacing with the v6 config system */ +static struct cnfparamdescr cnfpdescr[] = { + { "queue.filename", eCmdHdlrGetWord, 0 }, + { "queue.spooldirectory", eCmdHdlrGetWord, 0 }, + { "queue.size", eCmdHdlrSize, 0 }, + { "queue.dequeuebatchsize", eCmdHdlrInt, 0 }, + { "queue.mindequeuebatchsize", eCmdHdlrInt, 0 }, + { "queue.mindequeuebatchsize.timeout", eCmdHdlrInt, 0 }, + { "queue.maxdiskspace", eCmdHdlrSize, 0 }, + { "queue.highwatermark", eCmdHdlrInt, 0 }, + { "queue.lowwatermark", eCmdHdlrInt, 0 }, + { "queue.fulldelaymark", eCmdHdlrInt, 0 }, + { "queue.lightdelaymark", eCmdHdlrInt, 0 }, + { "queue.discardmark", eCmdHdlrInt, 0 }, + { "queue.discardseverity", eCmdHdlrFacility, 0 }, + { "queue.checkpointinterval", eCmdHdlrInt, 0 }, + { "queue.syncqueuefiles", eCmdHdlrBinary, 0 }, + { "queue.type", eCmdHdlrQueueType, 0 }, + { "queue.workerthreads", eCmdHdlrInt, 0 }, + { "queue.timeoutshutdown", eCmdHdlrInt, 0 }, + { "queue.timeoutactioncompletion", eCmdHdlrInt, 0 }, + { "queue.timeoutenqueue", eCmdHdlrInt, 0 }, + { "queue.timeoutworkerthreadshutdown", eCmdHdlrInt, 0 }, + { "queue.workerthreadminimummessages", eCmdHdlrInt, 0 }, + { "queue.maxfilesize", eCmdHdlrSize, 0 }, + { "queue.saveonshutdown", eCmdHdlrBinary, 0 }, + { "queue.dequeueslowdown", eCmdHdlrInt, 0 }, + { "queue.dequeuetimebegin", eCmdHdlrInt, 0 }, + { "queue.dequeuetimeend", eCmdHdlrInt, 0 }, + { "queue.cry.provider", eCmdHdlrGetWord, 0 }, + { "queue.samplinginterval", eCmdHdlrInt, 0 }, + { "queue.takeflowctlfrommsg", eCmdHdlrBinary, 0 } +}; +static struct cnfparamblk pblk = + { CNFPARAMBLK_VERSION, + sizeof(cnfpdescr)/sizeof(struct cnfparamdescr), + cnfpdescr + }; + +/* support to detect duplicate queue file names */ +struct queue_filename { + struct queue_filename *next; + const char *dirname; + const char *filename; +}; +struct queue_filename *queue_filename_root = NULL; + +/* debug aid */ +#if 0 +static inline void displayBatchState(batch_t *pBatch) +{ + int i; + for(i = 0 ; i < pBatch->nElem ; ++i) { + DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->eltState[i]); + } +} +#endif +static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint); + +/* do cleanup when config is loaded */ +void qqueueDoneLoadCnf(void) +{ + struct queue_filename *next, *del; + next = queue_filename_root; + while(next != NULL) { + del = next; + next = next->next; + free((void*) del->filename); + free((void*) del->dirname); + free((void*) del); + } +} + + +/*********************************************************************** + * we need a private data structure, the "to-delete" list. As C does + * not provide any partly private data structures, we implement this + * structure right here inside the module. + * Note that this list must always be kept sorted based on a unique + * dequeue ID (which is monotonically increasing). + * rgerhards, 2009-05-18 + ***********************************************************************/ + +/* generate next uniqueue dequeue ID. Note that uniqueness is only required + * on a per-queue basis and while this instance runs. So a stricly monotonically + * increasing counter is sufficient (if enough bits are used). + */ +static inline qDeqID getNextDeqID(qqueue_t *pQueue) +{ + ISOBJ_TYPE_assert(pQueue, qqueue); + return pQueue->deqIDAdd++; +} + + +/* return the top element of the to-delete list or NULL, if the + * list is empty. + */ +static toDeleteLst_t *tdlPeek(qqueue_t *pQueue) +{ + ISOBJ_TYPE_assert(pQueue, qqueue); + return pQueue->toDeleteLst; +} + + +/* remove the top element of the to-delete list. Nothing but the + * element itself is destroyed. Must not be called when the list + * is empty. + */ +static rsRetVal tdlPop(qqueue_t *pQueue) +{ + toDeleteLst_t *pRemove; + DEFiRet; + + ISOBJ_TYPE_assert(pQueue, qqueue); + assert(pQueue->toDeleteLst != NULL); + + pRemove = pQueue->toDeleteLst; + pQueue->toDeleteLst = pQueue->toDeleteLst->pNext; + free(pRemove); + + RETiRet; +} + + +/* Add a new to-delete list entry. The function allocates the data + * structure, populates it with the values provided and links the new + * element into the correct place inside the list. + */ +static rsRetVal +tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElemDeq) +{ + toDeleteLst_t *pNew; + toDeleteLst_t *pPrev; + DEFiRet; + + ISOBJ_TYPE_assert(pQueue, qqueue); + assert(pQueue->toDeleteLst != NULL); + + CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t))); + pNew->deqID = deqID; + pNew->nElemDeq = nElemDeq; + + /* now find right spot */ + for( pPrev = pQueue->toDeleteLst + ; pPrev != NULL && deqID > pPrev->deqID + ; pPrev = pPrev->pNext) { + /*JUST SEARCH*/; + } + + if(pPrev == NULL) { + pNew->pNext = pQueue->toDeleteLst; + pQueue->toDeleteLst = pNew; + } else { + pNew->pNext = pPrev->pNext; + pPrev->pNext = pNew; + } + +finalize_it: + RETiRet; +} + + +/* methods */ + +static const char * +getQueueTypeName(queueType_t t) +{ + const char *r; + + switch(t) { + case QUEUETYPE_FIXED_ARRAY: + r = "FixedArray"; + break; + case QUEUETYPE_LINKEDLIST: + r = "LinkedList"; + break; + case QUEUETYPE_DISK: + r = "Disk"; + break; + case QUEUETYPE_DIRECT: + r = "Direct"; + break; + default: + r = "invalid/unknown queue mode"; + break; + } + return r; +} + +void +qqueueDbgPrint(qqueue_t *pThis) +{ + dbgoprint((obj_t*) pThis, "parameter dump:\n"); + dbgoprint((obj_t*) pThis, "queue.filename '%s'\n", + (pThis->pszFilePrefix == NULL) ? "[NONE]" : (char*)pThis->pszFilePrefix); + dbgoprint((obj_t*) pThis, "queue.size: %d\n", pThis->iMaxQueueSize); + dbgoprint((obj_t*) pThis, "queue.dequeuebatchsize: %d\n", pThis->iDeqBatchSize); + dbgoprint((obj_t*) pThis, "queue.mindequeuebatchsize: %d\n", pThis->iMinDeqBatchSize); + dbgoprint((obj_t*) pThis, "queue.mindequeuebatchsize.timeout: %d\n", pThis->toMinDeqBatchSize); + dbgoprint((obj_t*) pThis, "queue.maxdiskspace: %lld\n", pThis->sizeOnDiskMax); + dbgoprint((obj_t*) pThis, "queue.highwatermark: %d\n", pThis->iHighWtrMrk); + dbgoprint((obj_t*) pThis, "queue.lowwatermark: %d\n", pThis->iLowWtrMrk); + dbgoprint((obj_t*) pThis, "queue.fulldelaymark: %d\n", pThis->iFullDlyMrk); + dbgoprint((obj_t*) pThis, "queue.lightdelaymark: %d\n", pThis->iLightDlyMrk); + dbgoprint((obj_t*) pThis, "queue.takeflowctlfrommsg: %d\n", pThis->takeFlowCtlFromMsg); + dbgoprint((obj_t*) pThis, "queue.discardmark: %d\n", pThis->iDiscardMrk); + dbgoprint((obj_t*) pThis, "queue.discardseverity: %d\n", pThis->iDiscardSeverity); + dbgoprint((obj_t*) pThis, "queue.checkpointinterval: %d\n", pThis->iPersistUpdCnt); + dbgoprint((obj_t*) pThis, "queue.syncqueuefiles: %d\n", pThis->bSyncQueueFiles); + dbgoprint((obj_t*) pThis, "queue.type: %d [%s]\n", pThis->qType, getQueueTypeName(pThis->qType)); + dbgoprint((obj_t*) pThis, "queue.workerthreads: %d\n", pThis->iNumWorkerThreads); + dbgoprint((obj_t*) pThis, "queue.timeoutshutdown: %d\n", pThis->toQShutdown); + dbgoprint((obj_t*) pThis, "queue.timeoutactioncompletion: %d\n", pThis->toActShutdown); + dbgoprint((obj_t*) pThis, "queue.timeoutenqueue: %d\n", pThis->toEnq); + dbgoprint((obj_t*) pThis, "queue.timeoutworkerthreadshutdown: %d\n", pThis->toWrkShutdown); + dbgoprint((obj_t*) pThis, "queue.workerthreadminimummessages: %d\n", pThis->iMinMsgsPerWrkr); + dbgoprint((obj_t*) pThis, "queue.maxfilesize: %lld\n", pThis->iMaxFileSize); + dbgoprint((obj_t*) pThis, "queue.saveonshutdown: %d\n", pThis->bSaveOnShutdown); + dbgoprint((obj_t*) pThis, "queue.dequeueslowdown: %d\n", pThis->iDeqSlowdown); + dbgoprint((obj_t*) pThis, "queue.dequeuetimebegin: %d\n", pThis->iDeqtWinFromHr); + dbgoprint((obj_t*) pThis, "queue.dequeuetimeend: %d\n", pThis->iDeqtWinToHr); +} + + +/* get the physical queue size. Must only be called + * while mutex is locked! + * rgerhards, 2008-01-29 + */ +static int +getPhysicalQueueSize(qqueue_t *pThis) +{ + return (int) PREFER_FETCH_32BIT(pThis->iQueueSize); +} + + +/* get the logical queue size (that is store size minus logically dequeued elements). + * Must only be called while mutex is locked! + * rgerhards, 2009-05-19 + */ +static int +getLogicalQueueSize(qqueue_t *pThis) +{ + return pThis->iQueueSize - pThis->nLogDeq; +} + + + +/* This function drains the queue in cases where this needs to be done. The most probable + * reason is a HUP which needs to discard data (because the queue is configured to be lossy). + * During a shutdown, this is typically not needed, as the OS frees up ressources and does + * this much quicker than when we clean up ourselvs. -- rgerhards, 2008-10-21 + * This function returns void, as it makes no sense to communicate an error back, even if + * it happens. + * This functions works "around" the regular deque mechanism, because it is only used to + * clean up (in cases where message loss is acceptable). + */ +static void queueDrain(qqueue_t *pThis) +{ + smsg_t *pMsg; + assert(pThis != NULL); + + DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", + pThis->qType, pThis->iQueueSize); + /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ + while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { + pThis->qDeq(pThis, &pMsg); + if(pMsg != NULL) { + msgDestruct(&pMsg); + } + pThis->qDel(pThis); + } +} + + +/* --------------- code for disk-assisted (DA) queue modes -------------------- */ + + +/* returns the number of workers that should be advised at + * this point in time. The mutex must be locked when + * ths function is called. -- rgerhards, 2008-01-25 + */ +static rsRetVal +qqueueAdviseMaxWorkers(qqueue_t *pThis) +{ + DEFiRet; + int iMaxWorkers; + + ISOBJ_TYPE_assert(pThis, qqueue); + + if(!pThis->bEnqOnly) { + if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { + DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n"); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1, DENY_WORKER_START_DURING_SHUTDOWN); + /* disk queues have always one worker */ + } + if(getLogicalQueueSize(pThis) == 0) { + iMaxWorkers = 0; + } else if(pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; + } else { + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers, DENY_WORKER_START_DURING_SHUTDOWN); + } + + RETiRet; +} + + +/* check if we run in disk-assisted mode and record that + * setting for easy (and quick!) access in the future. This + * function must only be called from constructors and only + * from those that support disk-assisted modes (aka memory- + * based queue drivers). + * rgerhards, 2008-01-14 + */ +static rsRetVal +qqueueChkIsDA(qqueue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + if(pThis->pszFilePrefix != NULL) { + pThis->bIsDA = 1; + DBGOPRINT((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n"); + } else { + DBGOPRINT((obj_t*) pThis, "is NOT disk-assisted\n"); + } + + RETiRet; +} + + +/* Start disk-assisted queue mode. + * rgerhards, 2008-01-15 + */ +static rsRetVal +StartDA(qqueue_t *pThis) +{ + DEFiRet; + uchar pszDAQName[128]; + + ISOBJ_TYPE_assert(pThis, qqueue); + + /* create message queue */ + CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK, pThis->iNumWorkerThreads, 0, pThis->pConsumer)); + + /* give it a name */ + snprintf((char*) pszDAQName, sizeof(pszDAQName), "%s[DA]", obj.GetName((obj_t*) pThis)); + obj.SetName((obj_t*) pThis->pqDA, pszDAQName); + + /* as the created queue is the same object class, we take the + * liberty to access its properties directly. + */ + pThis->pqDA->pqParent = pThis; + + CHKiRet(qqueueSetpAction(pThis->pqDA, pThis->pAction)); + CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); + CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); + CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); + CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(qqueueSetSpoolDir(pThis->pqDA, pThis->pszSpoolDir, pThis->lenSpoolDir)); + CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); + CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles)); + CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); + CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); + CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); + CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); + CHKiRet(qqueueSettoQShutdown(pThis->pqDA, pThis->toQShutdown)); + CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); + CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0)); + pThis->pqDA->iDeqBatchSize = pThis->iDeqBatchSize; + pThis->pqDA->iMinDeqBatchSize = pThis->iMinDeqBatchSize; + pThis->pqDA->iMinMsgsPerWrkr = pThis->iMinMsgsPerWrkr; + pThis->pqDA->iLowWtrMrk = pThis->iLowWtrMrk; + if(pThis->useCryprov) { + /* hand over cryprov to DA queue - in-mem queue does no longer need it + * and DA queue will be kept active from now on until termination. + */ + pThis->pqDA->useCryprov = pThis->useCryprov; + pThis->pqDA->cryprov = pThis->cryprov; + pThis->pqDA->cryprovData = pThis->cryprovData; + pThis->pqDA->cryprovName = pThis->cryprovName; + pThis->pqDA->cryprovNameFull = pThis->cryprovNameFull; + /* reset memory queue parameters */ + pThis->useCryprov = 0; + /* pThis->cryprov cannot and need not be reset, is structure */ + pThis->cryprovData = NULL; + pThis->cryprovName = NULL; + pThis->cryprovNameFull = NULL; + } + + iRet = qqueueStart(runConf, pThis->pqDA); + /* file not found is expected, that means it is no previous QIF available */ + if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) { + errno = 0; /* else an errno is shown in errmsg! */ + LogError(errno, iRet, "error starting up disk queue, using pure in-memory mode"); + pThis->bIsDA = 0; /* disable memory mode */ + FINALIZE; /* something is wrong */ + } + + DBGOPRINT((obj_t*) pThis, "DA queue initialized, disk queue 0x%lx\n", + qqueueGetID(pThis->pqDA)); + +finalize_it: + if(iRet != RS_RET_OK) { + if(pThis->pqDA != NULL) { + qqueueDestruct(&pThis->pqDA); + } + LogError(0, iRet, "%s: error creating disk queue - giving up.", + obj.GetName((obj_t*)pThis)); + pThis->bIsDA = 0; + } + + RETiRet; +} + + +/* initiate DA mode + * param bEnqOnly tells if the disk queue is to be run in enqueue-only mode. This may + * be needed during shutdown of memory queues which need to be persisted to disk. + * If this function fails (should not happen), DA mode is not turned on. + * rgerhards, 2008-01-16 + */ +static rsRetVal ATTR_NONNULL() +InitDA(qqueue_t *const pThis, const int bLockMutex) +{ + DEFiRet; + uchar pszBuf[64]; + size_t lenBuf; + + ISOBJ_TYPE_assert(pThis, qqueue); + if(bLockMutex == LOCK_MUTEX) { + d_pthread_mutex_lock(pThis->mut); + } + + /* check if we already have a DA worker pool. If not, initiate one. Please note that the + * pool is created on first need but never again destructed (until the queue is). This + * is intentional. We assume that when we need it once, we may also need it on another + * occasion. Ressources used are quite minimal when no worker is running. + * rgerhards, 2008-01-24 + * NOTE: this is the DA worker *pool*, not the DA queue! + */ + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis)); + CHKiRet(wtpConstruct (&pThis->pWtpDA)); + CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); + CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); + CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); + CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); + CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); + CHKiRet(wtpConstructFinalize (pThis->pWtpDA)); + /* if we reach this point, we have a "good" DA worker pool */ + + /* now construct the actual queue (if it does not already exist) */ + if(pThis->pqDA == NULL) { + CHKiRet(StartDA(pThis)); + } + +finalize_it: + if(bLockMutex == LOCK_MUTEX) { + d_pthread_mutex_unlock(pThis->mut); + } + RETiRet; +} + + +/* --------------- end code for disk-assisted queue modes -------------------- */ + + +/* Now, we define type-specific handlers. The provide a generic functionality, + * but for this specific type of queue. The mapping to these handlers happens during + * queue construction. Later on, handlers are called by pointers present in the + * queue instance object. + */ + +/* -------------------- fixed array -------------------- */ +static rsRetVal qConstructFixedArray(qqueue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + if(pThis->iMaxQueueSize == 0) + ABORT_FINALIZE(RS_RET_QSIZE_ZERO); + + if((pThis->tVars.farray.pBuf = malloc(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + + pThis->tVars.farray.deqhead = 0; + pThis->tVars.farray.head = 0; + pThis->tVars.farray.tail = 0; + + qqueueChkIsDA(pThis); + +finalize_it: + RETiRet; +} + + +static rsRetVal qDestructFixedArray(qqueue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + queueDrain(pThis); /* discard any remaining queue entries */ + free(pThis->tVars.farray.pBuf); + + RETiRet; +} + + +static rsRetVal qAddFixedArray(qqueue_t *pThis, smsg_t* in) +{ + DEFiRet; + + assert(pThis != NULL); + pThis->tVars.farray.pBuf[pThis->tVars.farray.tail] = in; + pThis->tVars.farray.tail++; + if (pThis->tVars.farray.tail == pThis->iMaxQueueSize) + pThis->tVars.farray.tail = 0; + + RETiRet; +} + + +static rsRetVal qDeqFixedArray(qqueue_t *pThis, smsg_t **out) +{ + DEFiRet; + + assert(pThis != NULL); + *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.deqhead]; + + pThis->tVars.farray.deqhead++; + if (pThis->tVars.farray.deqhead == pThis->iMaxQueueSize) + pThis->tVars.farray.deqhead = 0; + + RETiRet; +} + + +static rsRetVal qDelFixedArray(qqueue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + pThis->tVars.farray.head++; + if (pThis->tVars.farray.head == pThis->iMaxQueueSize) + pThis->tVars.farray.head = 0; + + RETiRet; +} + + +/* -------------------- linked list -------------------- */ + + +static rsRetVal qConstructLinkedList(qqueue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + pThis->tVars.linklist.pDeqRoot = NULL; + pThis->tVars.linklist.pDelRoot = NULL; + pThis->tVars.linklist.pLast = NULL; + + qqueueChkIsDA(pThis); + + RETiRet; +} + + +static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) +{ + DEFiRet; + + queueDrain(pThis); /* discard any remaining queue entries */ + + /* with the linked list type, there is nothing left to do here. The + * reason is that there are no dynamic elements for the list itself. + */ + + RETiRet; +} + +static rsRetVal qAddLinkedList(qqueue_t *pThis, smsg_t* pMsg) +{ + qLinkedList_t *pEntry; + DEFiRet; + + CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t)))); + + pEntry->pNext = NULL; + pEntry->pMsg = pMsg; + + if(pThis->tVars.linklist.pDelRoot == NULL) { + pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast + = pEntry; + } else { + pThis->tVars.linklist.pLast->pNext = pEntry; + pThis->tVars.linklist.pLast = pEntry; + } + + if(pThis->tVars.linklist.pDeqRoot == NULL) { + pThis->tVars.linklist.pDeqRoot = pEntry; + } + +finalize_it: + RETiRet; +} + + +static rsRetVal qDeqLinkedList(qqueue_t *pThis, smsg_t **ppMsg) +{ + qLinkedList_t *pEntry; + DEFiRet; + + pEntry = pThis->tVars.linklist.pDeqRoot; + if (pEntry != NULL) { + *ppMsg = pEntry->pMsg; + pThis->tVars.linklist.pDeqRoot = pEntry->pNext; + } else { + /* Check and return NULL for linklist.pDeqRoot */ + dbgprintf("qDeqLinkedList: pDeqRoot is NULL!\n"); + *ppMsg = NULL; + pThis->tVars.linklist.pDeqRoot = NULL; + } + + RETiRet; +} + + +static rsRetVal qDelLinkedList(qqueue_t *pThis) +{ + qLinkedList_t *pEntry; + DEFiRet; + + pEntry = pThis->tVars.linklist.pDelRoot; + + if(pThis->tVars.linklist.pDelRoot == pThis->tVars.linklist.pLast) { + pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = NULL; + } else { + pThis->tVars.linklist.pDelRoot = pEntry->pNext; + } + + free(pEntry); + + RETiRet; +} + + +/* -------------------- disk -------------------- */ + + +/* The following function is used to "save" ourself from being killed by + * a fatally failed disk queue. A fatal failure is, for example, if no + * data can be read or written. In that case, the disk support is disabled, + * with all on-disk structures kept as-is as much as possible. However, + * we do not really stop or destruct the in-memory disk queue object. + * Practice has shown that this may cause races during destruction which + * themselfs can lead to segfault. So we prefer to was some ressources by + * keeping the queue active. + * Instead, the queue is switched to direct mode, so that at least + * some processing can happen. Of course, this may still have lots of + * undesired side-effects, but is probably better than aborting the + * syslogd. Note that this function *must* succeed in one way or another, as + * we can not recover from failure here. But it may emit different return + * states, which can trigger different processing in the higher layers. + * rgerhards, 2011-05-03 + */ +static rsRetVal +queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) +{ + pThis->iQueueSize = 0; + pThis->nLogDeq = 0; + + pThis->qType = QUEUETYPE_DIRECT; + pThis->qConstruct = qConstructDirect; + pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ + pThis->qAdd = qAddDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; + if(pThis->pqParent != NULL) { + DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n"); + pThis->pqParent->bIsDA = 0; + pThis->pqParent->pqDA = NULL; + /* This may have undesired side effects, not sure if I really evaluated + * all. So you know where to look at if you come to this point during + * troubleshooting ;) -- rgerhards, 2011-05-03 + */ + } + + LogError(0, initiatingError, "fatal error on disk queue '%s', " + "emergency switch to direct mode", obj.GetName((obj_t*) pThis)); + return RS_RET_ERR_QUEUE_EMERGENCY; +} + + +static rsRetVal +qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis) +{ + DEFiRet; + ISOBJ_TYPE_assert(pStrm, strm); + ISOBJ_TYPE_assert(pThis, qqueue); + CHKiRet(strm.SetDir(pStrm, pThis->pszSpoolDir, pThis->lenSpoolDir)); + CHKiRet(strm.SetbSync(pStrm, pThis->bSyncQueueFiles)); +finalize_it: + RETiRet; +} + + +/* The method loads the persistent queue information. + * rgerhards, 2008-01-11 + */ +static rsRetVal +qqueueTryLoadPersistedInfo(qqueue_t *pThis) +{ + DEFiRet; + strm_t *psQIF = NULL; + struct stat stat_buf; + + ISOBJ_TYPE_assert(pThis, qqueue); + + /* check if the file exists */ + if(stat((char*) pThis->pszQIFNam, &stat_buf) == -1) { + if(errno == ENOENT) { + DBGOPRINT((obj_t*) pThis, "clean startup, no .qi file found\n"); + ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); + } else { + LogError(errno, RS_RET_IO_ERROR, "queue: %s: error %d could not access .qi file", + obj.GetName((obj_t*) pThis), errno); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + } + + /* If we reach this point, we have a .qi file */ + + CHKiRet(strm.Construct(&psQIF)); + CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_READ)); + CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam)); + CHKiRet(strm.ConstructFinalize(psQIF)); + + /* first, we try to read the property bag for ourselfs */ + CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF)); + + /* then the stream objects (same order as when persisted!) */ + CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF, + (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); + CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, + (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); + /* create a duplicate for the read "pointer". */ + CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq)); + CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */ + CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq)); + /* if we use a crypto provider, we need to amend the objects with it's info */ + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pWrite, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pWrite, pThis->cryprovData)); + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDeq, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDeq, pThis->cryprovData)); + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDel, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDel, pThis->cryprovData)); + } + + CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pWrite)); + CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDel)); + CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDeq)); + + /* OK, we could successfully read the file, so we now can request that it be + * deleted when we are done with the persisted information. + */ + pThis->bNeedDelQIF = 1; + LogMsg(0, RS_RET_OK, LOG_INFO, "%s: queue files exist on disk, re-starting with " + "%d messages. This will keep the disk queue file open, details: " + "https://rainer.gerhards.net/2013/07/rsyslog-why-disk-assisted-queues-keep-a-file-open.html", + objGetName((obj_t*) pThis), getLogicalQueueSize(pThis)); + +finalize_it: + if(psQIF != NULL) + strm.Destruct(&psQIF); + + if(iRet != RS_RET_OK) { + DBGOPRINT((obj_t*) pThis, "state %d reading .qi file - can not read persisted info (if any)\n", + iRet); + } + + RETiRet; +} + + +/* disk queue constructor. + * Note that we use a file limit of 10,000,000 files. That number should never pose a + * problem. If so, I guess the user has a design issue... But of course, the code can + * always be changed (though it would probably be more appropriate to increase the + * allowed file size at this point - that should be a config setting... + * rgerhards, 2008-01-10 + */ +static rsRetVal qConstructDisk(qqueue_t *pThis) +{ + DEFiRet; + int bRestarted = 0; + + assert(pThis != NULL); + + /* and now check if there is some persistent information that needs to be read in */ + iRet = qqueueTryLoadPersistedInfo(pThis); + if(iRet == RS_RET_OK) + bRestarted = 1; + else if(iRet != RS_RET_FILE_NOT_FOUND) + FINALIZE; + + if(bRestarted == 1) { + ; + } else { + CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite)); + CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles)); + CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, pThis->pszSpoolDir, pThis->lenSpoolDir)); + CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); + CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); + CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pWrite, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pWrite, pThis->cryprovData)); + } + CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite)); + + CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq)); + CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); + CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, pThis->pszSpoolDir, pThis->lenSpoolDir)); + CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); + CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); + CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDeq, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDeq, pThis->cryprovData)); + } + CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq)); + + CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel)); + CHKiRet(strm.SetbSync(pThis->tVars.disk.pReadDel, pThis->bSyncQueueFiles)); + CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); + CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, pThis->pszSpoolDir, pThis->lenSpoolDir)); + CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000)); + CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ)); + CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR)); + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDel, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDel, pThis->cryprovData)); + } + CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDel)); + + CHKiRet(strm.SetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strm.SetFName(pThis->tVars.disk.pReadDeq, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strm.SetFName(pThis->tVars.disk.pReadDel, pThis->pszFilePrefix, pThis->lenFilePrefix)); + } + + /* now we set (and overwrite in case of a persisted restart) some parameters which + * should always reflect the current configuration variables. Be careful by doing so, + * for example file name generation must not be changed as that would break the + * ability to read existing queue files. -- rgerhards, 2008-01-12 + */ + CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); + CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pReadDeq, pThis->iMaxFileSize)); + CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pReadDel, pThis->iMaxFileSize)); + +finalize_it: + RETiRet; +} + + +static rsRetVal qDestructDisk(qqueue_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + free(pThis->pszQIFNam); + if(pThis->tVars.disk.pWrite != NULL) { + int64 currOffs; + strm.GetCurrOffset(pThis->tVars.disk.pWrite, &currOffs); + if(currOffs == 0) { + /* if no data is present, we can (and must!) delete this + * file. Else we can leave garbagge after termination. + */ + strm.SetbDeleteOnClose(pThis->tVars.disk.pWrite, 1); + } + strm.Destruct(&pThis->tVars.disk.pWrite); + } + if(pThis->tVars.disk.pReadDeq != NULL) + strm.Destruct(&pThis->tVars.disk.pReadDeq); + if(pThis->tVars.disk.pReadDel != NULL) + strm.Destruct(&pThis->tVars.disk.pReadDel); + + RETiRet; +} + +static rsRetVal ATTR_NONNULL(1,2) +qAddDisk(qqueue_t *const pThis, smsg_t* pMsg) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, qqueue); + ISOBJ_TYPE_assert(pMsg, msg); + number_t nWriteCount; + const int oldfile = strmGetCurrFileNum(pThis->tVars.disk.pWrite); + + CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount)); + CHKiRet((objSerialize(pMsg))(pMsg, pThis->tVars.disk.pWrite)); + CHKiRet(strm.Flush(pThis->tVars.disk.pWrite)); + CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */ + + pThis->tVars.disk.sizeOnDisk += nWriteCount; + + /* we have enqueued the user element to disk. So we now need to destruct + * the in-memory representation. The instance will be re-created upon + * dequeue. -- rgerhards, 2008-07-09 + */ + msgDestruct(&pMsg); + + DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n", + nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly); + + /* Did we have a change in the on-disk file? If so, we + * should do a "robustness sync" of the .qi file to guard + * against the most harsh consequences of kill -9 and power off. + */ + int newfile; + newfile = strmGetCurrFileNum(pThis->tVars.disk.pWrite); + if(newfile != oldfile) { + DBGOPRINT((obj_t*) pThis, "current to-be-written-to file has changed from " + "number %d to number %d - requiring a .qi write for robustness\n", + oldfile, newfile); + pThis->tVars.disk.nForcePersist = 2; + } + +finalize_it: + RETiRet; +} + + +static rsRetVal +qDeqDisk(qqueue_t *pThis, smsg_t **ppMsg) +{ + DEFiRet; + iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", 3, + pThis->tVars.disk.pReadDeq, NULL, + NULL, msgConstructForDeserializer, NULL, MsgDeserialize); + if(iRet != RS_RET_OK) { + LogError(0, iRet, "%s: qDeqDisk error happened at around offset %lld", + obj.GetName((obj_t*)pThis), + (long long) pThis->tVars.disk.pReadDeq->iCurrOffs); + } + RETiRet; +} + + +/* -------------------- direct (no queueing) -------------------- */ +static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) +{ + return RS_RET_OK; +} + + +static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) +{ + return RS_RET_OK; +} + +static rsRetVal qAddDirectWithWti(qqueue_t *pThis, smsg_t* pMsg, wti_t *pWti) +{ + batch_t singleBatch; + batch_obj_t batchObj; + batch_state_t batchState = BATCH_STATE_RDY; + DEFiRet; + + //TODO: init batchObj (states _OK and new fields -- CHECK) + assert(pThis != NULL); + + /* calling the consumer is quite different here than it is from a worker thread */ + /* we need to provide the consumer's return value back to the caller because in direct + * mode the consumer probably has a lot to convey (which get's lost in the other modes + * because they are asynchronous. But direct mode is deliberately synchronous. + * rgerhards, 2008-02-12 + * We use our knowledge about the batch_t structure below, but without that, we + * pay a too-large performance toll... -- rgerhards, 2009-04-22 + */ + memset(&batchObj, 0, sizeof(batch_obj_t)); + memset(&singleBatch, 0, sizeof(batch_t)); + batchObj.pMsg = pMsg; + singleBatch.nElem = 1; /* there always is only one in direct mode */ + singleBatch.pElem = &batchObj; + singleBatch.eltState = &batchState; + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti); + msgDestruct(&pMsg); + + RETiRet; +} + +/* this is called if we do not have a pWti. This currently only happens + * when we are called from a main queue in direct mode. If so, we need + * to obtain a dummy pWti. + */ +static rsRetVal +qAddDirect(qqueue_t *pThis, smsg_t* pMsg) +{ + wti_t *pWti; + DEFiRet; + + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + iRet = qAddDirectWithWti(pThis, pMsg, pWti); + RETiRet; +} + + +/* --------------- end type-specific handlers -------------------- */ + + +/* generic code to add a queue entry + * We use some specific code to most efficiently support direct mode + * queues. This is justified in spite of the gain and the need to do some + * things truely different. -- rgerhards, 2008-02-12 + */ +static rsRetVal +qqueueAdd(qqueue_t *pThis, smsg_t *pMsg) +{ + DEFiRet; + + assert(pThis != NULL); + + static int msgCnt = 0; + + if(pThis->iSmpInterval > 0) + { + msgCnt = (msgCnt + 1) % (pThis->iSmpInterval); + if(msgCnt != 0) + { + msgDestruct(&pMsg); + goto finalize_it; + } + } + + CHKiRet(pThis->qAdd(pThis, pMsg)); + + if(pThis->qType != QUEUETYPE_DIRECT) { + ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); +# ifdef ENABLE_IMDIAG +# ifdef HAVE_ATOMIC_BUILTINS + /* mutex is never used due to conditional compilation */ + ATOMIC_INC(&iOverallQueueSize, &NULL); +# else + ++iOverallQueueSize; /* racy, but we can't wait for a mutex! */ +# endif +# endif + } + +finalize_it: + RETiRet; +} + + +/* generic code to dequeue a queue entry + */ +static rsRetVal +qqueueDeq(qqueue_t *pThis, smsg_t **ppMsg) +{ + DEFiRet; + + assert(pThis != NULL); + + /* we do NOT abort if we encounter an error, because otherwise the queue + * will not be decremented, what will most probably result in an endless loop. + * If we decrement, however, we may lose a message. But that is better than + * losing the whole process because it loops... -- rgerhards, 2008-01-03 + */ + iRet = pThis->qDeq(pThis, ppMsg); + ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq); + + DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + + RETiRet; +} + + +/* Try to shut down regular and DA queue workers, within the queue timeout + * period. That means processing continues as usual. This is the expected + * usual case, where during shutdown those messages remaining are being + * processed. At this point, it is acceptable that the queue can not be + * fully depleted, that case is handled in the next step. During this phase, + * we first shut down the main queue DA worker to prevent new data to arrive + * at the DA queue, and then we ask the regular workers of both the Regular + * and DA queue to try complete processing. + * rgerhards, 2009-10-14 + */ +static rsRetVal ATTR_NONNULL(1) +tryShutdownWorkersWithinQueueTimeout(qqueue_t *const pThis) +{ + struct timespec tTimeout; + rsRetVal iRetLocal; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pThis->pqParent == NULL); /* detect invalid calling sequence */ + + if(pThis->bIsDA) { + /* We need to lock the mutex, as otherwise we may have a race that prevents + * us from awaking the DA worker. */ + d_pthread_mutex_lock(pThis->mut); + + /* tell regular queue DA worker to stop shuffling messages to DA queue... */ + DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode for DA worker\n"); + pThis->pqDA->bEnqOnly = 1; + wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1, DENY_WORKER_START_DURING_SHUTDOWN); + DBGOPRINT((obj_t*) pThis, "awoke DA worker, told it to shut down.\n"); + + /* also tell the DA queue worker to shut down, so that it already knows... */ + wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN); + wtpAdviseMaxWorkers(pThis->pqDA->pWtpReg, 1, DENY_WORKER_START_DURING_SHUTDOWN); + /* awake its lone worker */ + DBGOPRINT((obj_t*) pThis, "awoke DA queue regular worker, told it to shut down when done.\n"); + + d_pthread_mutex_unlock(pThis->mut); + } + + + /* first calculate absolute timeout - we need the absolute value here, because we need to coordinate + * shutdown of both the regular and DA queue on *the same* timeout. + */ + timeoutComp(&tTimeout, pThis->toQShutdown); + DBGOPRINT((obj_t*) pThis, "trying shutdown of regular workers\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO, + "%s: regular queue shutdown timed out on primary queue " + "(this is OK, timeout was %d)", + objGetName((obj_t*) pThis), pThis->toQShutdown); + } else { + DBGOPRINT((obj_t*) pThis, "regular queue workers shut down.\n"); + } + + /* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */ + if(pThis->pqDA != NULL) { + DBGOPRINT((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", + qqueueGetID(pThis->pqDA)); + /* we use the same absolute timeout as above, so we do not use more than the configured + * timeout interval! + */ + DBGOPRINT((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n"); + iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO, + "%s: regular queue shutdown timed out on DA queue (this is OK, " + "timeout was %d)", objGetName((obj_t*) pThis), pThis->toQShutdown); + } else { + DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n"); + } + } + + RETiRet; +} + + +/* Try to shut down regular and DA queue workers, within the action timeout + * period. This aborts processing, but at the end of the current action, in + * a well-defined manner. During this phase, we terminate all three worker + * pools, including the regular queue DA worker if it not yet has terminated. + * Not finishing processing all messages is OK (and expected) at this stage + * (they may be preserved later, depending * on bSaveOnShutdown setting). + * rgerhards, 2009-10-14 + */ +static rsRetVal +tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) +{ + struct timespec tTimeout; + rsRetVal iRetLocal; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pThis->pqParent == NULL); /* detect invalid calling sequence */ + + /* instruct workers to finish ASAP, even if still work exists */ + DBGOPRINT((obj_t*) pThis, "trying to shutdown workers within Action Timeout"); + DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n"); + pThis->bEnqOnly = 1; + pThis->bShutdownImmediate = 1; + /* now DA queue */ + if(pThis->bIsDA) { + pThis->pqDA->bEnqOnly = 1; + pThis->pqDA->bShutdownImmediate = 1; + } + + /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ + timeoutComp(&tTimeout, pThis->toActShutdown); + DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO, + "%s: immediate shutdown timed out on primary queue (this is acceptable and " + "triggers cancellation)", objGetName((obj_t*) pThis)); + } else if(iRetLocal != RS_RET_OK) { + LogMsg(0, iRetLocal, LOG_WARNING, + "%s: potential internal error: unexpected return state after trying " + "immediate shutdown of the primary queue in disk save mode. " + "Continuing, but results are unpredictable", objGetName((obj_t*) pThis)); + } + + if(pThis->pqDA != NULL) { + /* and now the same for the DA queue */ + DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n"); + iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO, + "%s: immediate shutdown timed out on DA queue (this is acceptable and " + "triggers cancellation)", objGetName((obj_t*) pThis)); + } else if(iRetLocal != RS_RET_OK) { + LogMsg(0, iRetLocal, LOG_WARNING, + "%s: potential internal error: unexpected return state after trying " + "immediate shutdown of the DA queue in disk save mode. " + "Continuing, but results are unpredictable", objGetName((obj_t*) pThis)); + } + + /* and now we need to terminate the DA worker itself. We always grant it a 100ms timeout, + * which should be sufficient and usually not be required (it is expected to have finished + * long before while we were processing the queue timeout in shutdown phase 1). + * rgerhards, 2009-10-14 + */ + timeoutComp(&tTimeout, 100); + DBGOPRINT((obj_t*) pThis, "trying regular shutdown of main queue DA worker pool\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + LogMsg(0, iRetLocal, LOG_WARNING, + "%s: shutdown timed out on main queue DA worker pool " + "(this is not good, but possibly OK)", + objGetName((obj_t*) pThis)); + } else { + DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n"); + } + } + + RETiRet; +} + + +/* This function cancels all remaining regular workers for both the main and the DA + * queue. + * rgerhards, 2009-05-29 + */ +static rsRetVal +cancelWorkers(qqueue_t *pThis) +{ + rsRetVal iRetLocal; + DEFiRet; + + assert(pThis->qType != QUEUETYPE_DIRECT); + + /* Now queue workers should have terminated. If not, we need to cancel them as we have applied + * all timeout setting. If any worker in any queue still executes, its consumer is possibly + * long-running and cancelling is the only way to get rid of it. + */ + DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n"); + iRetLocal = wtpCancelAll(pThis->pWtpReg, objGetName((obj_t*) pThis)); + /* ^-- returns immediately if all threads already have terminated */ + if(iRetLocal != RS_RET_OK) { + DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel primary queue worker " + "threads, continuing, but results are unpredictable\n", iRetLocal); + } + + /* ... and now the DA queue, if it exists (should always be after the primary one) */ + if(pThis->pqDA != NULL) { + DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel any worker threads of " + "the DA queue\n"); + iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg, objGetName((obj_t*) pThis)); + /* returns immediately if all threads already have terminated */ + if(iRetLocal != RS_RET_OK) { + DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker " + "threads, continuing, but results are unpredictable\n", iRetLocal); + } + + /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be + * restarted later to persist the queue. But we stop it, because otherwise we get into + * big trouble when resetting the logical dequeue pointer. This operation can only be + * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 + */ + DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n"); + wtpCancelAll(pThis->pWtpDA, objGetName((obj_t*) pThis)); + /* returns immediately if all threads already have terminated */ + } + + RETiRet; +} + + +/* This function shuts down all worker threads and waits until they + * have terminated. If they timeout, they are cancelled. + * rgerhards, 2008-01-24 + * Please note that this function shuts down BOTH the parent AND the child queue + * in DA case. This is necessary because their timeouts are tightly coupled. Most + * importantly, the timeouts would be applied twice (or logic be extremely + * complex) if each would have its own shutdown. The function does not self check + * this condition - the caller must make sure it is not called with a parent. + * rgerhards, 2009-05-26: we do NO longer persist the queue here if bSaveOnShutdown + * is set. This must be handled by the caller. Not doing that cleans up the queue + * shutdown considerably. Also, older engines had a potential hang condition when + * the DA queue was already started and the DA worker configured for infinite + * retries and the action was during retry processing. This was a design issue, + * which is solved as of now. Note that the shutdown now may take a little bit + * longer, because we no longer can persist the queue in parallel to waiting + * on worker timeouts. + */ +rsRetVal ATTR_NONNULL(1) +qqueueShutdownWorkers(qqueue_t *const pThis) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, qqueue); + + if(pThis->qType == QUEUETYPE_DIRECT) { + FINALIZE; + } + + assert(pThis->pqParent == NULL); /* detect invalid calling sequence */ + + DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence %p\n", pThis); + + CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); + + pthread_mutex_lock(pThis->mut); + int physQueueSize; + physQueueSize = getPhysicalQueueSize(pThis); + pthread_mutex_unlock(pThis->mut); + if(physQueueSize > 0) { + CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis)); + } + + CHKiRet(cancelWorkers(pThis)); + + /* ... finally ... all worker threads have terminated :-) + * Well, more precisely, they *are in termination*. Some cancel cleanup handlers + * may still be running. Note that the main queue's DA worker may still be running. + */ + DBGOPRINT((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + +finalize_it: + RETiRet; +} + +/* Constructor for the queue object + * This constructs the data structure, but does not yet start the queue. That + * is done by queueStart(). The reason is that we want to give the caller a chance + * to modify some parameters before the queue is actually started. + */ +rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*)) +{ + DEFiRet; + qqueue_t *pThis; + const uchar *const workDir = glblGetWorkDirRaw(ourConf); + + assert(ppThis != NULL); + assert(pConsumer != NULL); + assert(iWorkerThreads >= 0); + + CHKmalloc(pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))); + + /* we have an object, so let's fill the properties */ + objConstructSetObjInfo(pThis); + + if(workDir != NULL) { + if((pThis->pszSpoolDir = ustrdup(workDir)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir); + } + /* set some water marks so that we have useful defaults if none are set specifically */ + pThis->iFullDlyMrk = -1; + pThis->iLightDlyMrk = -1; + pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ + pThis->iQueueSize = 0; + pThis->nLogDeq = 0; + pThis->useCryprov = 0; + pThis->takeFlowCtlFromMsg = 0; + pThis->iMaxQueueSize = iMaxQueueSize; + pThis->pConsumer = pConsumer; + pThis->iNumWorkerThreads = iWorkerThreads; + pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ + pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */ + pThis->iMinDeqBatchSize = 0; /* conservative default, should still provide good performance */ + pThis->isRunning = 0; + + pThis->pszFilePrefix = NULL; + pThis->qType = qType; + + + INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq); + +finalize_it: + OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP + RETiRet; +} + + +/* set default inside queue object suitable for action queues. + * This shall be called directly after queue construction. This functions has + * been added in support of the new v6 config system. It expect properly pre-initialized + * objects, but we need to differentiate between ruleset main and action queues. + * In order to avoid unnecessary complexity, we provide the necessary defaults + * via specific function calls. + */ +void +qqueueSetDefaultsActionQueue(qqueue_t *pThis) +{ + pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */ + pThis->iMaxQueueSize = 1000; /* size of the main message queue above */ + pThis->iDeqBatchSize = 128; /* default batch size */ + pThis->iMinDeqBatchSize = 0; + pThis->toMinDeqBatchSize = 1000; + pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ + pThis->iDiscardMrk = -1; /* begin to discard messages */ + pThis->iDiscardSeverity = 8; /* turn off */ + pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ + pThis->iMaxFileSize = 1024*1024; + pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */ + pThis->bSyncQueueFiles = 0; + pThis->toQShutdown = loadConf->globals.actq_dflt_toQShutdown; /* queue shutdown */ + pThis->toActShutdown = loadConf->globals.actq_dflt_toActShutdown; /* action shutdown (in phase 2) */ + pThis->toEnq = loadConf->globals.actq_dflt_toEnq; /* timeout for queue enque */ + pThis->toWrkShutdown = loadConf->globals.actq_dflt_toWrkShutdown; /* timeout for worker thread shutdown */ + pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */ + pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + pThis->sizeOnDiskMax = 0; /* unlimited */ + pThis->iDeqSlowdown = 0; + pThis->iDeqtWinFromHr = 0; + pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ + pThis->iSmpInterval = 0; /* disable sampling */ +} + + +/* set defaults inside queue object suitable for main/ruleset queues. + * See queueSetDefaultsActionQueue() for more details and background. + */ +void +qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) +{ + pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */ + pThis->iMaxQueueSize = 50000; /* size of the main message queue above */ + pThis->iDeqBatchSize = 1024; /* default batch size */ + pThis->iMinDeqBatchSize = 0; + pThis->toMinDeqBatchSize = 1000; + pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ + pThis->iDiscardMrk = -1; /* begin to discard messages */ + pThis->iDiscardSeverity = 8; /* turn off */ + pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ + pThis->iMaxFileSize = 16*1024*1024; + pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */ + pThis->bSyncQueueFiles = 0; + pThis->toQShutdown = ourConf->globals.ruleset_dflt_toQShutdown; + pThis->toActShutdown = ourConf->globals.ruleset_dflt_toActShutdown; + pThis->toEnq = ourConf->globals.ruleset_dflt_toEnq; + pThis->toWrkShutdown = ourConf->globals.ruleset_dflt_toWrkShutdown; + pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */ + pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + pThis->sizeOnDiskMax = 0; /* unlimited */ + pThis->iDeqSlowdown = 0; + pThis->iDeqtWinFromHr = 0; + pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ + pThis->iSmpInterval = 0; /* disable sampling */ +} + + +/* This function checks if the provided message shall be discarded and does so, if needed. + * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to + * provide real-time creation of spool files. + * Note: cached copies of iQueueSize is provided so that no mutex locks are required. + * The caller must have obtained them while the mutex was locked. Of course, these values may no + * longer be current, but that is OK for the discard check. At worst, the message is either processed + * or discarded when it should not have been. As discarding is in itself somewhat racy and erratic, + * that is no problems for us. This function MUST NOT lock the queue mutex, it could result in + * deadlocks! + * If the message is discarded, it can no longer be processed by the caller. So be sure to check + * the return state! + * rgerhards, 2008-01-24 + */ +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, smsg_t *pMsg) +{ + DEFiRet; + rsRetVal iRetLocal; + int iSeverity; + + ISOBJ_TYPE_assert(pThis, qqueue); + + if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { + iRetLocal = MsgGetSeverity(pMsg, &iSeverity); + if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { + DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", + iQueueSize, iSeverity); + STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd); + msgDestruct(&pMsg); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } else { + DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg " + "(iRet: %d, severity %d)\n", iQueueSize, iRetLocal, iSeverity); + } + } + +finalize_it: + RETiRet; +} + + +/* Finally remove n elements from the queue store. + */ +static rsRetVal ATTR_NONNULL(1) +DoDeleteBatchFromQStore(qqueue_t *const pThis, const int nElem) +{ + int i; + off64_t bytesDel = 0; /* keep CLANG static anaylzer happy */ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + + /* now send delete request to storage driver */ + if(pThis->qType == QUEUETYPE_DISK) { + strmMultiFileSeek(pThis->tVars.disk.pReadDel, pThis->tVars.disk.deqFileNumOut, + pThis->tVars.disk.deqOffs, &bytesDel); + /* We need to correct the on-disk file size. This time it is a bit tricky: + * we free disk space only upon file deletion. So we need to keep track of what we + * have read until we get an out-offset that is lower than the in-offset (which + * indicates file change). Then, we can subtract the whole thing from the on-disk + * size. -- rgerhards, 2008-01-30 + */ + if(bytesDel != 0) { + pThis->tVars.disk.sizeOnDisk -= bytesDel; + DBGOPRINT((obj_t*) pThis, "doDeleteBatch: a %lld octet file has been deleted, now %lld " + "octets disk space used\n", (long long) bytesDel, pThis->tVars.disk.sizeOnDisk); + /* awake possibly waiting enq process */ + pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ + } + } else { /* memory queue */ + for(i = 0 ; i < nElem ; ++i) { + pThis->qDel(pThis); + } + } + + /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ + ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); +# ifdef ENABLE_IMDIAG +# ifdef HAVE_ATOMIC_BUILTINS + /* mutex is never used due to conditional compilation */ + ATOMIC_SUB(&iOverallQueueSize, nElem, &NULL); +# else + iOverallQueueSize -= nElem; /* racy, but we can't wait for a mutex! */ +# endif +# endif + ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); + DBGPRINTF("doDeleteBatch: delete batch from store, new sizes: log %d, phys %d\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + ++pThis->deqIDDel; /* one more batch dequeued */ + + if((pThis->qType == QUEUETYPE_DISK) && (bytesDel != 0)) { + qqueuePersist(pThis, QUEUE_CHECKPOINT); /* robustness persist .qi file */ + } + + RETiRet; +} + + +/* remove messages from the physical queue store that are fully processed. This is + * controlled via the to-delete list. + */ +static rsRetVal +DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) +{ + toDeleteLst_t *pTdl; + qDeqID deqIDDel; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pBatch != NULL); + +dbgprintf("rger: deleteBatchFromQStore, nElem %d\n", (int) pBatch->nElem); + pTdl = tdlPeek(pThis); /* get current head element */ + if(pTdl == NULL) { /* to-delete list empty */ + DoDeleteBatchFromQStore(pThis, pBatch->nElem); + } else if(pBatch->deqID == pThis->deqIDDel) { + deqIDDel = pThis->deqIDDel; + pTdl = tdlPeek(pThis); + while(pTdl != NULL && deqIDDel == pTdl->deqID) { + DoDeleteBatchFromQStore(pThis, pTdl->nElemDeq); + tdlPop(pThis); + ++deqIDDel; + pTdl = tdlPeek(pThis); + } + /* old entries deleted, now delete current ones... */ + DoDeleteBatchFromQStore(pThis, pBatch->nElem); + } else { + /* can not delete, insert into to-delete list */ + DBGPRINTF("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); + CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); + } + +finalize_it: + RETiRet; +} + + +/* Delete a batch of processed user objects from the queue, which includes + * destructing the objects themself. Any entries not marked as finally + * processed are enqueued again. The new enqueue is necessary because we have a + * rgerhards, 2009-05-13 + */ +static rsRetVal +DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) +{ + int i; + smsg_t *pMsg; + int nEnqueued = 0; + rsRetVal localRet; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pBatch != NULL); + + for(i = 0 ; i < pBatch->nElem ; ++i) { + pMsg = pBatch->pElem[i].pMsg; + DBGPRINTF("DeleteProcessedBatch: etry %d state %d\n", i, pBatch->eltState[i]); + if( pBatch->eltState[i] == BATCH_STATE_RDY + || pBatch->eltState[i] == BATCH_STATE_SUB) { + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); + ++nEnqueued; + if(localRet != RS_RET_OK) { + DBGPRINTF("DeleteProcessedBatch: error %d re-enqueuing unprocessed " + "data element - discarded\n", localRet); + } + } + msgDestruct(&pMsg); + } + + DBGPRINTF("DeleteProcessedBatch: we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + + if(nEnqueued > 0) + qqueueChkPersist(pThis, nEnqueued); + + iRet = DeleteBatchFromQStore(pThis, pBatch); + + pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ // TODO: more fine init, new fields! 2010-06-14 + + RETiRet; +} + + +/* dequeue as many user pointers as are available, until we hit the configured + * upper limit of pointers. Note that this function also deletes all processed + * objects from the previous batch. However, it is perfectly valid that the + * previous batch contained NO objects at all. For example, this happens + * immediately after system startup or when a queue was exhausted and the queue + * worker needed to wait for new data. + * This must only be called when the queue mutex is LOOKED, otherwise serious + * malfunction will happen. + */ +static rsRetVal ATTR_NONNULL() +DequeueConsumableElements(qqueue_t *const pThis, wti_t *const pWti, + int *const piRemainingQueueSize, int *const pSkippedMsgs) +{ + int nDequeued; + int nDiscarded; + int nDeleted; + int iQueueSize; + int keep_running = 1; + struct timespec timeout; + smsg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + nDeleted = pWti->batch.nElemDeq; + DeleteProcessedBatch(pThis, &pWti->batch); + + nDequeued = nDiscarded = 0; + if(pThis->qType == QUEUETYPE_DISK) { + pThis->tVars.disk.deqFileNumIn = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq); + } + + /* work-around clang static analyzer false positive, we need a const value */ + const int iMinDeqBatchSize = pThis->iMinDeqBatchSize; + if(iMinDeqBatchSize > 0) { + timeoutComp(&timeout, pThis->toMinDeqBatchSize);/* get absolute timeout */ + } + + while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { + int rd_fd = -1; + int64_t rd_offs = 0; + int wr_fd = -1; + int64_t wr_offs = 0; + if(pThis->tVars.disk.pReadDeq != NULL) { + rd_fd = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq); + rd_offs = pThis->tVars.disk.pReadDeq->iCurrOffs; + } + if(pThis->tVars.disk.pWrite != NULL) { + wr_fd = strmGetCurrFileNum(pThis->tVars.disk.pWrite); + wr_offs = pThis->tVars.disk.pWrite->iCurrOffs; + } + if(rd_fd != -1 && rd_fd == wr_fd && rd_offs == wr_offs) { + DBGPRINTF("problem on disk queue '%s': " + //"queue size log %d, phys %d, but rd_fd=wr_rd=%d and offs=%lld\n", + "queue size log %d, phys %d, but rd_fd=wr_rd=%d and offs=%" PRId64 "\n", + obj.GetName((obj_t*) pThis), iQueueSize, pThis->iQueueSize, + rd_fd, rd_offs); + *pSkippedMsgs = iQueueSize; +# ifdef ENABLE_IMDIAG + iOverallQueueSize -= iQueueSize; +# endif + pThis->iQueueSize -= iQueueSize; + iQueueSize = 0; + break; + } + + localRet = qqueueDeq(pThis, &pMsg); + if(localRet == RS_RET_FILE_NOT_FOUND) { + DBGPRINTF("fatal error on disk queue '%s': file '%s' " + "not found, queue size said to be %d", + obj.GetName((obj_t*) pThis), "...", iQueueSize); + } + CHKiRet(localRet); + + /* check if we should discard this element */ + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg); + if(localRet == RS_RET_QUEUE_FULL) { + ++nDiscarded; + continue; + } else if(localRet != RS_RET_OK) { + ABORT_FINALIZE(localRet); + } + + /* all well, use this element */ + pWti->batch.pElem[nDequeued].pMsg = pMsg; + pWti->batch.eltState[nDequeued] = BATCH_STATE_RDY; + ++nDequeued; + if(nDequeued < iMinDeqBatchSize && getLogicalQueueSize(pThis) == 0) { + while(!pThis->bShutdownImmediate + && keep_running + && nDequeued < iMinDeqBatchSize + && getLogicalQueueSize(pThis) == 0) { + dbgprintf("%s minDeqBatchSize doing wait, batch is %d messages, " + "queue size %d\n", obj.GetName((obj_t*) pThis), + nDequeued, getLogicalQueueSize(pThis)); + if(wtiWaitNonEmpty(pWti, timeout) == 0) { /* timeout? */ + DBGPRINTF("%s minDeqBatchSize timeout, batch is %d messages\n", + obj.GetName((obj_t*) pThis), nDequeued); + keep_running = 0; + } + } + } + if(keep_running) { + keep_running = (getLogicalQueueSize(pThis) > 0) + && (nDequeued < pThis->iDeqBatchSize); + } + } + + if(pThis->qType == QUEUETYPE_DISK) { + strm.GetCurrOffset(pThis->tVars.disk.pReadDeq, &pThis->tVars.disk.deqOffs); + pThis->tVars.disk.deqFileNumOut = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq); + } + + /* it is sufficient to persist only when the bulk of work is done */ + qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted); + + /* If messages where DISCARDED, we need to substract them from the OverallQueueSize */ +# ifdef ENABLE_IMDIAG +# ifdef HAVE_ATOMIC_BUILTINS + ATOMIC_SUB(&iOverallQueueSize, nDiscarded, &NULL); +# else + iOverallQueueSize -= nDiscarded; /* racy, but we can't wait for a mutex! */ +# endif + DBGOPRINT((obj_t*) pThis, "dequeued %d discarded %d QueueSize %d consumable elements, szlog %d sz phys %d\n", + nDequeued, nDiscarded, iOverallQueueSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +# else + DBGOPRINT((obj_t*) pThis, "dequeued %d discarded %d consumable elements, szlog %d sz phys %d\n", + nDequeued, nDiscarded, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +# endif + + pWti->batch.nElem = nDequeued; + pWti->batch.nElemDeq = nDequeued + nDiscarded; + pWti->batch.deqID = getNextDeqID(pThis); + *piRemainingQueueSize = iQueueSize; +finalize_it: + RETiRet; +} + + +/* dequeue the queued object for the queue consumers. + * rgerhards, 2008-10-21 + * I made a radical change - we now dequeue multiple elements, and store these objects in + * an array of user pointers. We expect that this increases performance. + * rgerhards, 2009-04-22 + */ +static rsRetVal +DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int *const pSkippedMsgs) +{ + DEFiRet; + int iQueueSize = 0; /* keep the compiler happy... */ + + *pSkippedMsgs = 0; + /* dequeue element batch (still protected from mutex) */ + iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize, pSkippedMsgs); + if(*pSkippedMsgs > 0) { + LogError(0, RS_RET_ERR, "%s: lost %d messages from diskqueue (invalid .qi file)", + obj.GetName((obj_t*)pThis), *pSkippedMsgs); + } + + /* awake some flow-controlled sources if we can do this right now */ + /* TODO: this could be done better from a performance point of view -- do it only if + * we have someone waiting for the condition (or only when we hit the watermark right + * on the nail [exact value]) -- rgerhards, 2008-03-14 + * now that we dequeue batches of pointers, this is much less an issue... + * rgerhards, 2009-04-22 + */ + if(iQueueSize < pThis->iFullDlyMrk / 2 || glbl.GetGlobalInputTermState() == 1) { + pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk); + } + + if(iQueueSize < pThis->iLightDlyMrk / 2) { + pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); + } + + pthread_cond_signal(&pThis->notFull); + /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ + + if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { + LogError(0, iRet, "%s: error dequeueing element - ignoring, " + "but strange things may happen", obj.GetName((obj_t*)pThis)); + } + + RETiRet; +} + + +/* The rate limiter + * + * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when + * it actually delays processing. Otherwise inputs are stalled. + * + * Here we may wait if a dequeue time window is defined or if we are + * rate-limited. TODO: If we do so, we should also look into the + * way new worker threads are spawned. Obviously, it doesn't make much + * sense to spawn additional worker threads when none of them can do any + * processing. However, it is deemed acceptable to allow this for an initial + * implementation of the timeframe/rate limiting feature. + * Please also note that these feature could also be implemented at the action + * level. However, that would limit them to be used together with actions. We have + * taken the broader approach, moving it right into the queue. This is even + * necessary if we want to prevent spawning of multiple unnecessary worker + * threads as described above. -- rgerhards, 2008-04-02 + * + * + * time window: tCurr is current time; tFrom is start time, tTo is end time (in mil 24h format). + * We may have tFrom = 4, tTo = 10 --> run from 4 to 10 hrs. nice and happy + * we may also have tFrom= 22, tTo = 4 -> run from 10pm to 4am, which is actually two + * windows: 0-4; 22-23:59 + * so when to run? Let's assume we have 3am + * + * if(tTo < tFrom) { + * if(tCurr < tTo [3 < 4] || tCurr > tFrom [3 > 22]) + * do work + * else + * sleep for tFrom - tCurr "hours" [22 - 5 --> 17] + * } else { + * if(tCurr >= tFrom [3 >= 4] && tCurr < tTo [3 < 10]) + * do work + * else + * sleep for tTo - tCurr "hours" [4 - 3 --> 1] + * } + * + * Bottom line: we need to check which type of window we have and need to adjust our + * logic accordingly. Of course, sleep calculations need to be done up to the minute, + * but you get the idea from the code above. + */ +static rsRetVal +RateLimiter(qqueue_t *pThis) +{ + DEFiRet; + int iDelay; + int iHrCurr; + time_t tCurr; + struct tm m; + + ISOBJ_TYPE_assert(pThis, qqueue); + + iDelay = 0; + if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ + /* time calls are expensive, so only do them when needed */ + datetime.GetTime(&tCurr); + localtime_r(&tCurr, &m); + iHrCurr = m.tm_hour; + + if(pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) { + if(iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) { + ; /* do not delay */ + } else { + iDelay = (pThis->iDeqtWinFromHr - iHrCurr) * 3600; + /* this time, we are already into the next hour, so we need + * to subtract our current minute and seconds. + */ + iDelay -= m.tm_min * 60; + iDelay -= m.tm_sec; + } + } else { + if(iHrCurr >= pThis->iDeqtWinFromHr && iHrCurr < pThis->iDeqtWinToHr) { + ; /* do not delay */ + } else { + if(iHrCurr < pThis->iDeqtWinFromHr) { + iDelay = (pThis->iDeqtWinFromHr - iHrCurr - 1) * 3600; + /* -1 as we are already in the hour */ + iDelay += (60 - m.tm_min) * 60; + iDelay += 60 - m.tm_sec; + } else { + iDelay = (24 - iHrCurr + pThis->iDeqtWinFromHr) * 3600; + /* this time, we are already into the next hour, so we need + * to subtract our current minute and seconds. + */ + iDelay -= m.tm_min * 60; + iDelay -= m.tm_sec; + } + } + } + } + + if(iDelay > 0) { + pthread_mutex_unlock(pThis->mut); + DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); + srSleep(iDelay, 0); + pthread_mutex_lock(pThis->mut); + } + + RETiRet; +} + + +/* This dequeues the next batch. Note that this function must not be + * cancelled, else it will leave back an inconsistent state. + * rgerhards, 2009-05-20 + */ +static rsRetVal +DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int *const pSkippedMsgs) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ISOBJ_TYPE_assert(pWti, wti); + + CHKiRet(DequeueConsumable(pThis, pWti, pSkippedMsgs)); + + if(pWti->batch.nElem == 0) + ABORT_FINALIZE(RS_RET_IDLE); + + +finalize_it: + RETiRet; +} + + +/* This is called when a batch is processed and the worker does not + * ask for another batch (e.g. because it is to be terminated) + * Note that we must not be terminated while we delete a processed + * batch. Otherwise, we may not complete it, and then the cancel + * handler also tries to delete the batch. But then it finds some of + * the messages already destructed. This was a bug we have seen, especially + * with disk mode, where a delete takes rather long. Anyhow, the coneptual + * problem exists in all queue modes. + * rgerhards, 2009-05-27 + */ +static rsRetVal +batchProcessed(qqueue_t *pThis, wti_t *pWti) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ISOBJ_TYPE_assert(pWti, wti); + + int iCancelStateSave; + /* at this spot, we must not be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + DeleteProcessedBatch(pThis, &pWti->batch); + qqueueChkPersist(pThis, pWti->batch.nElemDeq); + pthread_setcancelstate(iCancelStateSave, NULL); + + RETiRet; +} + + +/* This is the queue consumer in the regular (non-DA) case. It is + * protected by the queue mutex, but MUST release it as soon as possible. + * rgerhards, 2008-01-21 + */ +static rsRetVal +ConsumerReg(qqueue_t *pThis, wti_t *pWti) +{ + int iCancelStateSave; + int bNeedReLock = 0; /**< do we need to lock the mutex again? */ + int skippedMsgs = 0; /**< did the queue loose any messages (can happen with + ** disk queue if .qi file is corrupt */ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ISOBJ_TYPE_assert(pWti, wti); + + iRet = DequeueForConsumer(pThis, pWti, &skippedMsgs); + if(iRet == RS_RET_FILE_NOT_FOUND) { + /* This is a fatal condition and means the queue is almost unusable */ + d_pthread_mutex_unlock(pThis->mut); + DBGOPRINT((obj_t*) pThis, "got 'file not found' error %d, queue defunct\n", iRet); + iRet = queueSwitchToEmergencyMode(pThis, iRet); + // TODO: think about what to return as iRet -- keep RS_RET_FILE_NOT_FOUND? + d_pthread_mutex_lock(pThis->mut); + } + if (iRet != RS_RET_OK) { + FINALIZE; + } + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + bNeedReLock = 1; + + /* report errors, now that we are outside of queue lock */ + if(skippedMsgs > 0) { + LogError(0, 0, "problem on disk queue '%s': " + "queue files contain %d messages fewer than specified " + "in .qi file -- we lost those messages. That's all we know.", + obj.GetName((obj_t*) pThis), skippedMsgs); + } + + /* at this spot, we may be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); + + + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti)); + + /* we now need to check if we should deliberately delay processing a bit + * and, if so, do that. -- rgerhards, 2008-01-30 + */ + if(pThis->iDeqSlowdown) { + DBGOPRINT((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n", + pThis->iDeqSlowdown); + srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); + } + + /* but now cancellation is no longer permitted */ + pthread_setcancelstate(iCancelStateSave, NULL); + +finalize_it: + DBGPRINTF("regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + + /* now we are done, but potentially need to re-acquire the mutex */ + if(bNeedReLock) + d_pthread_mutex_lock(pThis->mut); + + RETiRet; +} + + +/* This is a special consumer to feed the disk-queue in disk-assisted mode. + * When active, our own queue more or less acts as a memory buffer to the disk. + * So this consumer just needs to drain the memory queue and submit entries + * to the disk queue. The disk queue will then call the actual consumer from + * the app point of view (we chain two queues here). + * When this method is entered, the mutex is always locked and needs to be unlocked + * as part of the processing. + * rgerhards, 2008-01-14 + */ +static rsRetVal +ConsumerDA(qqueue_t *pThis, wti_t *pWti) +{ + int i; + int iCancelStateSave; + int bNeedReLock = 0; /**< do we need to lock the mutex again? */ + int skippedMsgs = 0; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ISOBJ_TYPE_assert(pWti, wti); + + CHKiRet(DequeueForConsumer(pThis, pWti, &skippedMsgs)); + + /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ + d_pthread_mutex_unlock(pThis->mut); + bNeedReLock = 1; + + /* at this spot, we may be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); + + /* iterate over returned results and enqueue them in DA queue */ + for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { + iRet = qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg)); + if(iRet != RS_RET_OK) { + if(iRet == RS_RET_ERR_QUEUE_EMERGENCY) { + /* Queue emergency error occurred */ + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg caught RS_RET_ERR_QUEUE_EMERGENCY," + "aborting loop.\n"); + FINALIZE; + } else { + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg item (%d) returned " + "with error state: '%d'\n", i, iRet); + } + } + pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */ + } + + /* but now cancellation is no longer permitted */ + pthread_setcancelstate(iCancelStateSave, NULL); + +finalize_it: + /* Check the last return state of qqueueEnqMsg. If an error was returned, we acknowledge it only. + * Unless the error code is RS_RET_ERR_QUEUE_EMERGENCY, we reset the return state to RS_RET_OK. + * Otherwise the Caller functions would run into an infinite Loop trying to enqueue the + * same messages over and over again. + * + * However we do NOT overwrite positive return states like + * RS_RET_TERMINATE_NOW, + * RS_RET_NO_RUN, + * RS_RET_IDLE, + * RS_RET_TERMINATE_WHEN_IDLE + * These return states are important for Queue handling of the upper laying functions. + * RGer: Note that checking for iRet < 0 is a bit bold. In theory, positive iRet + * values are "OK" states, and things that the caller shall deal with. However, + * this has not been done so consistently. Andre convinced me that the current + * code is an elegant solution. However, if problems with queue workers and/or + * shutdown come up, this code here should be looked at suspiciously. In those + * cases it may work out to check all status codes explicitely, just to avoid + * a pitfall due to unexpected states being passed on to the caller. + */ + if( iRet != RS_RET_OK && + iRet != RS_RET_ERR_QUEUE_EMERGENCY && + iRet < 0) { + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg Resetting iRet from %d back to RS_RET_OK\n", iRet); + iRet = RS_RET_OK; + } else { + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg returns with iRet %d\n", iRet); + } + + /* now we are done, but potentially need to re-acquire the mutex */ + if(bNeedReLock) + d_pthread_mutex_lock(pThis->mut); + + RETiRet; +} + + +/* must only be called when the queue mutex is locked, else results + * are not stable! + */ +static rsRetVal +qqueueChkStopWrkrDA(qqueue_t *pThis) +{ + DEFiRet; + + DBGPRINTF("rger: chkStopWrkrDA called, low watermark %d, log Size %d, phys Size %d, bEnqOnly %d\n", + pThis->iLowWtrMrk, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), pThis->bEnqOnly); + if(pThis->bEnqOnly) { + iRet = RS_RET_TERMINATE_WHEN_IDLE; + } + if(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk) { + iRet = RS_RET_TERMINATE_NOW; + } + + RETiRet; +} + + +/* must only be called when the queue mutex is locked, else results + * are not stable! + * If we are a child, we have done our duty when the queue is empty. In that case, + * we can terminate. Version for the regular worker thread. + */ +static rsRetVal +ChkStopWrkrReg(qqueue_t *pThis) +{ + DEFiRet; + /*DBGPRINTF("XXXX: chkStopWrkrReg called, low watermark %d, log Size %d, phys Size %d, bEnqOnly %d\n", + pThis->iLowWtrMrk, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), pThis->bEnqOnly);*/ + if(pThis->bEnqOnly) { + iRet = RS_RET_TERMINATE_NOW; + } else if(pThis->pqParent != NULL) { + iRet = RS_RET_TERMINATE_WHEN_IDLE; + } + + RETiRet; +} + + +/* return the configured "deq max at once" interval + * rgerhards, 2009-04-22 + */ +static rsRetVal +GetDeqBatchSize(qqueue_t *pThis, int *pVal) +{ + DEFiRet; + assert(pVal != NULL); + *pVal = pThis->iDeqBatchSize; + RETiRet; +} + + +/* start up the queue - it must have been constructed and parameters defined + * before. + */ +rsRetVal +qqueueStart(rsconf_t *cnf, qqueue_t *pThis) /* this is the ConstructionFinalizer */ +{ + DEFiRet; + uchar pszBuf[64]; + uchar pszQIFNam[MAXFNAME]; + int wrk; + uchar *qName; + size_t lenBuf; + + assert(pThis != NULL); + + /* do not modify the queue if it's already running(happens when dynamic config reload is invoked + * and the queue is used in the new config as well) + */ + if (pThis->isRunning) + FINALIZE; + + dbgoprint((obj_t*) pThis, "starting queue\n"); + + if(pThis->pszSpoolDir == NULL) { + /* note: we need to pick the path so late as we do not have + * the workdir during early config load + */ + if((pThis->pszSpoolDir = (uchar*) strdup((char*)glbl.GetWorkDir(cnf))) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir); + } + /* set type-specific handlers and other very type-specific things + * (we can not totally hide it...) + */ + switch(pThis->qType) { + case QUEUETYPE_FIXED_ARRAY: + pThis->qConstruct = qConstructFixedArray; + pThis->qDestruct = qDestructFixedArray; + pThis->qAdd = qAddFixedArray; + pThis->qDeq = qDeqFixedArray; + pThis->qDel = qDelFixedArray; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; + break; + case QUEUETYPE_LINKEDLIST: + pThis->qConstruct = qConstructLinkedList; + pThis->qDestruct = qDestructLinkedList; + pThis->qAdd = qAddLinkedList; + pThis->qDeq = qDeqLinkedList; + pThis->qDel = qDelLinkedList; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; + break; + case QUEUETYPE_DISK: + pThis->qConstruct = qConstructDisk; + pThis->qDestruct = qDestructDisk; + pThis->qAdd = qAddDisk; + pThis->qDeq = qDeqDisk; + pThis->qDel = NULL; /* delete for disk handled via special code! */ + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; + /* pre-construct file name for .qi file */ + pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam), + "%s/%s.qi", (char*) pThis->pszSpoolDir, (char*)pThis->pszFilePrefix); + pThis->pszQIFNam = ustrdup(pszQIFNam); + DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam, + (int) pThis->lenQIFNam); + break; + case QUEUETYPE_DIRECT: + pThis->qConstruct = qConstructDirect; + pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ + pThis->qAdd = qAddDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; + break; + } + + /* finalize some initializations that could not yet be done because it is + * influenced by properties which might have been set after queueConstruct () + */ + if(pThis->pqParent == NULL) { + CHKmalloc(pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t))); + pthread_mutex_init(pThis->mut, NULL); + } else { + /* child queue, we need to use parent's mutex */ + DBGOPRINT((obj_t*) pThis, "I am a child\n"); + pThis->mut = pThis->pqParent->mut; + } + + pthread_mutex_init(&pThis->mutThrdMgmt, NULL); + pthread_cond_init (&pThis->notFull, NULL); + pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL); + pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL); + + /* call type-specific constructor */ + CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ + + /* re-adjust some params if required */ + if(pThis->bIsDA) { + /* if we are in DA mode, we must make sure full delayable messages do not + * initiate going to disk! + */ + wrk = pThis->iHighWtrMrk - (pThis->iHighWtrMrk / 100) * 50; /* 50% of high water mark */ + if(wrk < pThis->iFullDlyMrk) + pThis->iFullDlyMrk = wrk; + } + + DBGOPRINT((obj_t*) pThis, "params: type %d, enq-only %d, disk assisted %d, spoolDir '%s', maxFileSz %lld, " + "maxQSize %d, lqsize %d, pqsize %d, child %d, full delay %d, " + "light delay %d, deq batch size %d, min deq batch size %d, " + "high wtrmrk %d, low wtrmrk %d, " + "discardmrk %d, max wrkr %d, min msgs f. wrkr %d " + "takeFlowCtlFromMsg %d\n", + pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->pszSpoolDir, + pThis->iMaxFileSize, pThis->iMaxQueueSize, + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), + pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk, + pThis->iDeqBatchSize, pThis->iMinDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk, + pThis->iDiscardMrk, (int) pThis->iNumWorkerThreads, (int) pThis->iMinMsgsPerWrkr, + pThis->takeFlowCtlFromMsg); + + pThis->bQueueStarted = 1; + if(pThis->qType == QUEUETYPE_DIRECT) + FINALIZE; /* with direct queues, we are already finished... */ + + /* create worker thread pools for regular and DA operation. + */ + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%.*s:Reg", + (int) (sizeof(pszBuf)-16), + obj.GetName((obj_t*) pThis)); /* leave some room inside the name for suffixes */ + if(lenBuf >= sizeof(pszBuf)) { + LogError(0, RS_RET_INTERNAL_ERROR, "%s:%d debug header too long: %zd - in " + "thory this cannot happen - truncating", __FILE__, __LINE__, lenBuf); + lenBuf = sizeof(pszBuf)-1; + pszBuf[lenBuf] = '\0'; + } + CHKiRet(wtpConstruct (&pThis->pWtpReg)); + CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); + CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); + CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); + CHKiRet(wtpSettoWrkShutdown (pThis->pWtpReg, pThis->toWrkShutdown)); + CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis)); + CHKiRet(wtpConstructFinalize (pThis->pWtpReg)); + + /* set up DA system if we have a disk-assisted queue */ + if(pThis->bIsDA) + InitDA(pThis, LOCK_MUTEX); /* initiate DA mode */ + + DBGOPRINT((obj_t*) pThis, "queue finished initialization\n"); + + /* if the queue already contains data, we need to start the correct number of worker threads. This can be + * the case when a disk queue has been loaded. If we did not start it here, it would never start. + */ + qqueueAdviseMaxWorkers(pThis); + + /* support statistics gathering */ + qName = obj.GetName((obj_t*)pThis); + CHKiRet(statsobj.Construct(&pThis->statsobj)); + CHKiRet(statsobj.SetName(pThis->statsobj, qName)); + CHKiRet(statsobj.SetOrigin(pThis->statsobj, (uchar*)"core.queue")); + /* we need to save the queue size, as the stats module initializes it to 0! */ + /* iQueueSize is a dual-use counter: no init, no mutex! */ + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("size"), + ctrType_Int, CTR_FLAG_NONE, &pThis->iQueueSize)); + + STATSCOUNTER_INIT(pThis->ctrEnqueued, pThis->mutCtrEnqueued); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("enqueued"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrEnqueued)); + + STATSCOUNTER_INIT(pThis->ctrFull, pThis->mutCtrFull); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFull)); + + STATSCOUNTER_INIT(pThis->ctrFDscrd, pThis->mutCtrFDscrd); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.full"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFDscrd)); + STATSCOUNTER_INIT(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.nf"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrNFDscrd)); + + pThis->ctrMaxqsize = 0; /* no mutex needed, thus no init call */ + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"), + ctrType_Int, CTR_FLAG_NONE, &pThis->ctrMaxqsize)); + + CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); + +finalize_it: + if(iRet != RS_RET_OK) { + /* note: a child uses it's parent mutex, so do not delete it! */ + if(pThis->pqParent == NULL && pThis->mut != NULL) + free(pThis->mut); + } else { + pThis->isRunning = 1; + } + RETiRet; +} + + +/* persist the queue to disk (write the .qi file). If we have something to persist, we first + * save the information on the queue properties itself and then we call + * the queue-type specific drivers. + * Variable bIsCheckpoint is set to 1 if the persist is for a checkpoint, + * and 0 otherwise. + * rgerhards, 2008-01-10 + */ +static rsRetVal +qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) +{ + DEFiRet; + char *tmpQIFName = NULL; + strm_t *psQIF = NULL; /* Queue Info File */ + char errStr[1024]; + + assert(pThis != NULL); + + if(pThis->qType != QUEUETYPE_DISK) { + if(getPhysicalQueueSize(pThis) > 0) { + /* This error code is OK, but we will probably not implement this any time + * The reason is that persistence happens via DA queues. But I would like to + * leave the code as is, as we so have a hook in case we need one. + * -- rgerhards, 2008-01-28 + */ + ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); + } else + FINALIZE; /* if the queue is empty, we are happy and done... */ + } + + DBGOPRINT((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis)); + + if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) { + if(pThis->bNeedDelQIF) { + unlink((char*)pThis->pszQIFNam); + pThis->bNeedDelQIF = 0; + } + /* indicate spool file needs to be deleted */ + if(pThis->tVars.disk.pReadDel != NULL) /* may be NULL if we had a startup failure! */ + CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); + FINALIZE; /* nothing left to do, so be happy */ + } + + int lentmpQIFName; +#ifdef _AIX + lentmpQIFName = strlen( pThis->pszQIFNam) + strlen(".tmp") + 1; + tmpQIFName = malloc(sizeof(char)*lentmpQIFName); + if(tmpQIFName == NULL) + tmpQIFName = (char*)pThis->pszQIFNam; + snprintf(tmpQIFName, lentmpQIFName, "%s.tmp", pThis->pszQIFNam); +#else + lentmpQIFName = asprintf((char **)&tmpQIFName, "%s.tmp", pThis->pszQIFNam); + if(tmpQIFName == NULL) + tmpQIFName = (char*)pThis->pszQIFNam; +#endif + + CHKiRet(strm.Construct(&psQIF)); + CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC)); + CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles)); + CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strm.SetFName(psQIF, (uchar*) tmpQIFName, lentmpQIFName)); + CHKiRet(strm.ConstructFinalize(psQIF)); + + /* first, write the property bag for ourselfs + * And, surprisingly enough, we currently need to persist only the size of the + * queue. All the rest is re-created with then-current config parameters when the + * queue is re-created. Well, we'll also save the current queue type, just so that + * we know when somebody has changed the queue type... -- rgerhards, 2008-01-11 + */ + CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis)); + objSerializeSCALAR(psQIF, iQueueSize, INT); + objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64); + CHKiRet(obj.EndSerialize(psQIF)); + + /* now persist the stream info */ + if(pThis->tVars.disk.pWrite != NULL) + CHKiRet(strm.Serialize(pThis->tVars.disk.pWrite, psQIF)); + if(pThis->tVars.disk.pReadDel != NULL) + CHKiRet(strm.Serialize(pThis->tVars.disk.pReadDel, psQIF)); + + strm.Destruct(&psQIF); + if(tmpQIFName != (char*)pThis->pszQIFNam) { /* pointer, not string comparison! */ + if(rename(tmpQIFName, (char*)pThis->pszQIFNam) != 0) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + DBGOPRINT((obj_t*) pThis, + "FATAL error: renaming temporary .qi file failed: %s\n", + errStr); + ABORT_FINALIZE(RS_RET_RENAME_TMP_QI_ERROR); + } + } + + /* tell the input file object that it must not delete the file on close if the queue + * is non-empty - but only if we are not during a simple checkpoint + */ + if(bIsCheckpoint != QUEUE_CHECKPOINT + && pThis->tVars.disk.pReadDel != NULL) { + CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 0)); + } + + /* we have persisted the queue object. So whenever it comes to an empty queue, + * we need to delete the QIF. Thus, we indicte that need. + */ + pThis->bNeedDelQIF = 1; + +finalize_it: + if(tmpQIFName != (char*)pThis->pszQIFNam) /* pointer, not string comparison! */ + free(tmpQIFName); + if(psQIF != NULL) + strm.Destruct(&psQIF); + + RETiRet; +} + + +/* check if we need to persist the current queue info. If an + * error occurs, this should be ignored by caller (but we still + * abide to our regular call interface)... + * rgerhards, 2008-01-13 + * nUpdates is the number of updates since the last call to this function. + * It may be > 1 due to batches. -- rgerhards, 2009-05-12 + */ +static rsRetVal qqueueChkPersist(qqueue_t *const pThis, const int nUpdates) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, qqueue); + assert(nUpdates >= 0); + + if(nUpdates == 0) + FINALIZE; + + pThis->iUpdsSincePersist += nUpdates; + if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { + qqueuePersist(pThis, QUEUE_CHECKPOINT); + pThis->iUpdsSincePersist = 0; + } + +finalize_it: + RETiRet; +} + + +/* persist a queue with all data elements to disk - this is used to handle + * bSaveOnShutdown. We utilize the DA worker to do this. This must only + * be called after all workers have been shut down and if bSaveOnShutdown + * is actually set. Note that this function may potentially run long, + * depending on the queue configuration (e.g. store on remote machine). + * rgerhards, 2009-05-26 + */ +static rsRetVal +DoSaveOnShutdown(qqueue_t *pThis) +{ + struct timespec tTimeout; + rsRetVal iRetLocal; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + + /* we reduce the low water mark, otherwise the DA worker would terminate when + * it is reached. + */ + DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown set, restarting DA worker...\n"); + pThis->bShutdownImmediate = 0; /* would termiante the DA worker! */ + pThis->iLowWtrMrk = 0; + wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN); /* shutdown worker (only) when done (was _IMMEDIATE!) */ + wtpAdviseMaxWorkers(pThis->pWtpDA, 1, PERMIT_WORKER_START_DURING_SHUTDOWN); /* restart DA worker */ + + DBGOPRINT((obj_t*) pThis, "waiting for DA worker to terminate...\n"); + timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); + /* and run the primary queue's DA worker to drain the queue */ + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); + DBGOPRINT((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", + iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + if(iRetLocal != RS_RET_OK) { + DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary " + "queue in disk save mode, continuing, but results are unpredictable\n", iRetLocal); + } + + RETiRet; +} + + +/* destructor for the queue object */ +BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDestruct(qqueue) + DBGOPRINT((obj_t*) pThis, "shutdown: begin to destruct queue\n"); + if(ourConf->globals.shutdownQueueDoubleSize) { + pThis->iHighWtrMrk *= 2; + pThis->iMaxQueueSize *= 2; + } + if(pThis->bQueueStarted) { + /* shut down all workers + * We do not need to shutdown workers when we are in enqueue-only mode or we are a + * direct queue - because in both cases we have none... ;) + * with a child! -- rgerhards, 2008-01-28 + */ + if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL + && pThis->pWtpReg != NULL) + qqueueShutdownWorkers(pThis); + + if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0){ + if(pThis->bSaveOnShutdown) { + LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO, + "%s: queue holds %d messages after shutdown of workers. " + "queue.saveonshutdown is set, so data will now be spooled to disk", + objGetName((obj_t*) pThis), getPhysicalQueueSize(pThis)); + CHKiRet(DoSaveOnShutdown(pThis)); + } else { + LogMsg(0, RS_RET_TIMED_OUT, LOG_WARNING, + "%s: queue holds %d messages after shutdown of workers. " + "queue.saveonshutdown is NOT set, so data will be discarded.", + objGetName((obj_t*) pThis), getPhysicalQueueSize(pThis)); + } + } + + /* finally destruct our (regular) worker thread pool + * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, + * e.g. when they are not created in enqueue-only mode. We already check the condition + * as this may otherwise be very hard to find once we optimize (and have long forgotten + * about this condition here ;) + * rgerhards, 2008-01-25 + */ + if(pThis->qType != QUEUETYPE_DIRECT && pThis->pWtpReg != NULL) { + wtpDestruct(&pThis->pWtpReg); + } + + /* Now check if we actually have a DA queue and, if so, destruct it. + * Note that the wtp must be destructed first, it may be in cancel cleanup handler + * *right now* and actually *need* to access the queue object to persist some final + * data (re-queueing case). So we need to destruct the wtp first, which will make + * sure all workers have terminated. Please note that this also generates a situation + * where it is possible that the DA queue has a parent pointer but the parent has + * no WtpDA associated with it - which is perfectly legal thanks to this code here. + */ + if(pThis->pWtpDA != NULL) { + wtpDestruct(&pThis->pWtpDA); + } + if(pThis->pqDA != NULL) { + qqueueDestruct(&pThis->pqDA); + } + + /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) + * This handler is most important for disk queues, it will finally persist the necessary + * on-disk structures. In theory, other queueing modes may implement their other (non-DA) + * methods of persisting a queue between runs, but in practice all of this is done via + * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here + * if need arises (what I doubt...) -- rgerhards, 2008-01-25 + */ + CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) { + DBGOPRINT((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet); + } + + /* finally, clean up some simple things... */ + if(pThis->pqParent == NULL) { + /* if we are not a child, we allocated our own mutex, which we now need to destroy */ + pthread_mutex_destroy(pThis->mut); + free(pThis->mut); + } + pthread_mutex_destroy(&pThis->mutThrdMgmt); + pthread_cond_destroy(&pThis->notFull); + pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); + pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); + + DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq); + + /* type-specific destructor */ + iRet = pThis->qDestruct(pThis); + } + + free(pThis->pszFilePrefix); + free(pThis->pszSpoolDir); + if(pThis->useCryprov) { + pThis->cryprov.Destruct(&pThis->cryprovData); + obj.ReleaseObj(__FILE__, pThis->cryprovNameFull+2, pThis->cryprovNameFull, + (void*) &pThis->cryprov); + free(pThis->cryprovName); + free(pThis->cryprovNameFull); + } + + /* some queues do not provide stats and thus have no statsobj! */ + if(pThis->statsobj != NULL) + statsobj.Destruct(&pThis->statsobj); +ENDobjDestruct(qqueue) + + +/* set the queue's spool directory. The directory MUST NOT be NULL. + * The passed-in string is duplicated. So if the caller does not need + * it any longer, it must free it. + */ +rsRetVal +qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir) +{ + DEFiRet; + + free(pThis->pszSpoolDir); + CHKmalloc(pThis->pszSpoolDir = ustrdup(pszSpoolDir)); + pThis->lenSpoolDir = lenSpoolDir; + +finalize_it: + RETiRet; +} + + +/* set the queue's file prefix + * The passed-in string is duplicated. So if the caller does not need + * it any longer, it must free it. + * rgerhards, 2008-01-09 + */ +rsRetVal +qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) +{ + DEFiRet; + + free(pThis->pszFilePrefix); + pThis->pszFilePrefix = NULL; + + if(pszPrefix == NULL) /* just unset the prefix! */ + ABORT_FINALIZE(RS_RET_OK); + + if((pThis->pszFilePrefix = malloc(iLenPrefix + 1)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); + pThis->lenFilePrefix = iLenPrefix; + +finalize_it: + RETiRet; +} + +/* set the queue's maximum file size + * rgerhards, 2008-01-09 + */ +rsRetVal +qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + + if(iMaxFileSize < 1024) { + ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW); + } + + pThis->iMaxFileSize = iMaxFileSize; + +finalize_it: + RETiRet; +} + + +/* enqueue a single data object. + * Note that the queue mutex MUST already be locked when this function is called. + * rgerhards, 2009-06-16 + */ +static rsRetVal +doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, smsg_t *pMsg) +{ + DEFiRet; + int err; + struct timespec t; + + STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued); + /* first check if we need to discard this message (which will cause CHKiRet() to exit) + */ + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg)); + + /* handle flow control + * There are two different flow control mechanisms: basic and advanced flow control. + * Basic flow control has always been implemented and protects the queue structures + * in that it makes sure no more data is enqueued than the queue is configured to + * support. Enhanced flow control is being added today. There are some sources which + * can easily be stopped, e.g. a file reader. This is the case because it is unlikely + * that blocking those sources will have negative effects (after all, the file is + * continued to be written). Other sources can somewhat be blocked (e.g. the kernel + * log reader or the local log stream reader): in general, nothing is lost if messages + * from these sources are not picked up immediately. HOWEVER, they can not block for + * an extended period of time, as this either causes message loss or - even worse - some + * other bad effects (e.g. unresponsive system in respect to the main system log socket). + * Finally, there are some (few) sources which can not be blocked at all. UDP syslog is + * a prime example. If a UDP message is not received, it is simply lost. So we can't + * do anything against UDP sockets that come in too fast. The core idea of advanced + * flow control is that we take into account the different natures of the sources and + * select flow control mechanisms that fit these needs. This also means, in the end + * result, that non-blockable sources like UDP syslog receive priority in the system. + * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14 + */ + if(unlikely(pThis->takeFlowCtlFromMsg)) { /* recommendation is NOT to use this option */ + flowCtlType = pMsg->flowCtlType; + } + if(flowCtlType == eFLOWCTL_FULL_DELAY) { + while(pThis->iQueueSize >= pThis->iFullDlyMrk&& ! glbl.GetGlobalInputTermState()) { + /* We have a problem during shutdown if we block eternally. In that + * case, the the input thread cannot be terminated. So we wake up + * from time to time to check for termination. + * TODO/v6(at earliest): check if we could signal the condition during + * shutdown. However, this requires new queue registries and thus is + * far to much change for a stable version (and I am still not sure it + * is worth the effort, given how seldom this situation occurs and how + * few resources the wakeups need). -- rgerhards, 2012-05-03 + * In any case, this was the old code (if we do the TODO): + * pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); + */ + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: FullDelay mark reached for full " + "delayable message - blocking, queue size is %d.\n", pThis->iQueueSize); + timeoutComp(&t, 1000); + err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); + if(err != 0 && err != ETIMEDOUT) { + /* Something is really wrong now. Report to debug log and abort the + * wait. That keeps us running, even though we may lose messages. + */ + DBGOPRINT((obj_t*) pThis, "potential program bug: pthread_cond_timedwait()" + "/fulldelay returned %d\n", err); + break; + + } + DBGPRINTF("wti worker in full delay timed out, checking termination...\n"); + } + } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY && !glbl.GetGlobalInputTermState()) { + if(pThis->iQueueSize >= pThis->iLightDlyMrk) { + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: LightDelay mark reached for light " + "delayable message - blocking a bit.\n"); + timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */ + err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); + if(err != 0 && err != ETIMEDOUT) { + /* Something is really wrong now. Report to debug log */ + DBGOPRINT((obj_t*) pThis, "potential program bug: pthread_cond_timedwait()" + "/lightdelay returned %d\n", err); + + } + } + } + + /* from our regular flow control settings, we are now ready to enqueue the object. + * However, we now need to do a check if the queue permits to add more data. If that + * is not the case, basic flow control enters the field, which means we wait for + * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14 + */ + while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) + || ((pThis->qType == QUEUETYPE_DISK || pThis->bIsDA) && pThis->sizeOnDiskMax != 0 + && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { + STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull); + if(pThis->toEnq == 0 || pThis->bEnqOnly) { + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - configured for immediate " + "discarding QueueSize=%d MaxQueueSize=%d sizeOnDisk=%lld " + "sizeOnDiskMax=%lld\n", pThis->iQueueSize, pThis->iMaxQueueSize, + pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax); + STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); + msgDestruct(&pMsg); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } else { + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - waiting %dms to drain.\n", + pThis->toEnq); + if(glbl.GetGlobalInputTermState()) { + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL, discard due to " + "FORCE_TERM.\n"); + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + timeoutComp(&t, pThis->toEnq); + const int r = pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t); + if(dbgTimeoutToStderr && r != 0) { + fprintf(stderr, "%lld: queue timeout(%dms), error %d%s, " + "lost message %s\n", (long long) time(NULL), pThis->toEnq, + r, ( r == ETIMEDOUT) ? "[ETIMEDOUT]" : "", pMsg->pszRawMsg); + } + if(r == ETIMEDOUT) { + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: cond timeout, dropping message!\n"); + STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); + msgDestruct(&pMsg); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } else if(r != 0) { + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: cond error %d, dropping message!\n", r); + STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); + msgDestruct(&pMsg); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } + dbgoprint((obj_t*) pThis, "doEnqSingleObject: wait solved queue full condition, enqueing\n"); + } + } + + /* and finally enqueue the message */ + CHKiRet(qqueueAdd(pThis, pMsg)); + STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize); + + /* check if we had a file rollover and need to persist + * the .qi file for robustness reasons. + * Note: the n=2 write is required for closing the old file and + * the n=1 write is required after opening and writing to the new + * file. + */ + if(pThis->tVars.disk.nForcePersist > 0) { + DBGOPRINT((obj_t*) pThis, ".qi file write required for robustness reasons (n=%d)\n", + pThis->tVars.disk.nForcePersist); + pThis->tVars.disk.nForcePersist--; + qqueuePersist(pThis, QUEUE_CHECKPOINT); + } + +finalize_it: + RETiRet; +} + +/* ------------------------------ multi-enqueue functions ------------------------------ */ +/* enqueue multiple user data elements at once. The aim is to provide a faster interface + * for object submission. Uses the multi_submit_t helper object. + * Please note that this function is not cancel-safe and consequently + * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE + * during its execution. If that is not done, race conditions occur if the + * thread is canceled (most important use case is input module termination). + * rgerhards, 2009-06-16 + * Note: there now exists multiple different functions implementing specially + * optimized algorithms for different config cases. -- rgerhards, 2010-06-09 + */ +/* now the function for all modes but direct */ +static rsRetVal +qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +{ + int iCancelStateSave; + int i; + rsRetVal localRet; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pMultiSub != NULL); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(pThis->mut); + for(i = 0 ; i < pMultiSub->nElem ; ++i) { + localRet = doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]); + if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) + ABORT_FINALIZE(localRet); + } + qqueueChkPersist(pThis, pMultiSub->nElem); + +finalize_it: + /* make sure at least one worker is running. */ + qqueueAdviseMaxWorkers(pThis); + /* and release the mutex */ + d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + + RETiRet; +} + +/* now, the same function, but for direct mode */ +static rsRetVal +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +{ + int i; + wti_t *pWti; + DEFiRet; + + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + + for(i = 0 ; i < pMultiSub->nElem ; ++i) { + CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); + } + +finalize_it: + RETiRet; +} +/* ------------------------------ END multi-enqueue functions ------------------------------ */ + + +/* enqueue a new user data element + * Enqueues the new element and awakes worker thread. + */ +rsRetVal +qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, smsg_t *pMsg) +{ + DEFiRet; + int iCancelStateSave; + ISOBJ_TYPE_assert(pThis, qqueue); + + const int isNonDirectQ = pThis->qType != QUEUETYPE_DIRECT; + + if(isNonDirectQ) { + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(pThis->mut); + } + + CHKiRet(doEnqSingleObj(pThis, flowCtlType, pMsg)); + + qqueueChkPersist(pThis, 1); + +finalize_it: + if(isNonDirectQ) { + /* make sure at least one worker is running. */ + qqueueAdviseMaxWorkers(pThis); + /* and release the mutex */ + d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + DBGOPRINT((obj_t*) pThis, "EnqueueMsg advised worker start\n"); + } + + RETiRet; +} + + +/* are any queue params set at all? 1 - yes, 0 - no + * We need to evaluate the param block for this function, which is somewhat + * inefficient. HOWEVER, this is only done during config load, so we really + * don't care... -- rgerhards, 2013-05-10 + */ +int +queueCnfParamsSet(struct nvlst *lst) +{ + int r; + struct cnfparamvals *pvals; + + pvals = nvlstGetParams(lst, &pblk, NULL); + r = cnfparamvalsIsSet(&pblk, pvals); + cnfparamvalsDestruct(pvals, &pblk); + return r; +} + + +static rsRetVal +initCryprov(qqueue_t *pThis, struct nvlst *lst) +{ + uchar szDrvrName[1024]; + DEFiRet; + + if(snprintf((char*)szDrvrName, sizeof(szDrvrName), "lmcry_%s", pThis->cryprovName) + == sizeof(szDrvrName)) { + LogError(0, RS_RET_ERR, "queue: crypto provider " + "name is too long: '%s' - encryption disabled", + pThis->cryprovName); + ABORT_FINALIZE(RS_RET_ERR); + } + pThis->cryprovNameFull = ustrdup(szDrvrName); + + pThis->cryprov.ifVersion = cryprovCURR_IF_VERSION; + /* The pDrvrName+2 below is a hack to obtain the object name. It + * safes us to have yet another variable with the name without "lm" in + * front of it. If we change the module load interface, we may re-think + * about this hack, but for the time being it is efficient and clean enough. + */ + if(obj.UseObj(__FILE__, szDrvrName, szDrvrName, (void*) &pThis->cryprov) + != RS_RET_OK) { + LogError(0, RS_RET_LOAD_ERROR, "queue: could not load " + "crypto provider '%s' - encryption disabled", + szDrvrName); + ABORT_FINALIZE(RS_RET_CRYPROV_ERR); + } + + if(pThis->cryprov.Construct(&pThis->cryprovData) != RS_RET_OK) { + LogError(0, RS_RET_CRYPROV_ERR, "queue: error constructing " + "crypto provider %s dataset - encryption disabled", + szDrvrName); + ABORT_FINALIZE(RS_RET_CRYPROV_ERR); + } + CHKiRet(pThis->cryprov.SetCnfParam(pThis->cryprovData, lst, CRYPROV_PARAMTYPE_DISK)); + + dbgprintf("loaded crypto provider %s, data instance at %p\n", + szDrvrName, pThis->cryprovData); + pThis->useCryprov = 1; +finalize_it: + RETiRet; +} + +/* check the the queue file name is unique. */ +static rsRetVal ATTR_NONNULL() +checkUniqueDiskFile(qqueue_t *const pThis) +{ + DEFiRet; + struct queue_filename *queue_fn_curr = queue_filename_root; + struct queue_filename *newetry = NULL; + const char *const curr_dirname = (pThis->pszSpoolDir == NULL) ? "" : (char*)pThis->pszSpoolDir; + + if(pThis->pszFilePrefix == NULL) { + FINALIZE; /* no disk queue! */ + } + + while(queue_fn_curr != NULL) { + if(!strcmp((const char*) pThis->pszFilePrefix, queue_fn_curr->filename) && + !strcmp(curr_dirname, queue_fn_curr->dirname)) { + parser_errmsg("queue directory '%s' and file name prefix '%s' already used. " + "This is not possible. Please make it unique.", + curr_dirname, pThis->pszFilePrefix); + ABORT_FINALIZE(RS_RET_ERR_QUEUE_FN_DUP); + } + queue_fn_curr = queue_fn_curr->next; + } + + /* name ok, so let's add it to the list */ + CHKmalloc(newetry = calloc(1, sizeof(struct queue_filename))); + CHKmalloc(newetry->filename = strdup((char*) pThis->pszFilePrefix)); + CHKmalloc(newetry->dirname = strdup(curr_dirname)); + newetry->next = queue_filename_root; + queue_filename_root = newetry; + +finalize_it: + if(iRet != RS_RET_OK) { + if(newetry != NULL) { + free((void*)newetry->filename); + free((void*)newetry); + } + } + RETiRet; +} + +void +qqueueCorrectParams(qqueue_t *pThis) +{ + int goodval; /* a "good value" to use for comparisons (different objects) */ + + if(pThis->iMaxQueueSize < 100 + && (pThis->qType == QUEUETYPE_LINKEDLIST || pThis->qType == QUEUETYPE_FIXED_ARRAY)) { + LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "Note: queue.size=\"%d\" is very " + "low and can lead to unpredictable results. See also " + "https://www.rsyslog.com/lower-bound-for-queue-sizes/", + pThis->iMaxQueueSize); + } + + /* we need to do a quick check if our water marks are set plausible. If not, + * we correct the most important shortcomings. + */ + goodval = (pThis->iMaxQueueSize / 100) * 60; + if(pThis->iHighWtrMrk != -1 && pThis->iHighWtrMrk < goodval) { + LogMsg(0, RS_RET_CONF_PARSE_WARNING, LOG_WARNING, "queue \"%s\": high water mark " + "is set quite low at %d. You should only set it below " + "60%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); + } + + if(pThis->iNumWorkerThreads > 1) { + goodval = (pThis->iMaxQueueSize / 100) * 10; + if(pThis->iMinMsgsPerWrkr != -1 && pThis->iMinMsgsPerWrkr < goodval) { + LogMsg(0, RS_RET_CONF_PARSE_WARNING, LOG_WARNING, "queue \"%s\": " + "queue.workerThreadMinimumMessage " + "is set quite low at %d. You should only set it below " + "10%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iMinMsgsPerWrkr, goodval); + } + } + + if(pThis->iDiscardMrk > pThis->iMaxQueueSize) { + LogError(0, RS_RET_PARAM_ERROR, "error: queue \"%s\": " + "queue.discardMark %d is set larger than queue.size", + obj.GetName((obj_t*) pThis), pThis->iDiscardMrk); + } + + goodval = (pThis->iMaxQueueSize / 100) * 80; + if(pThis->iDiscardMrk != -1 && pThis->iDiscardMrk < goodval) { + LogMsg(0, RS_RET_CONF_PARSE_WARNING, LOG_WARNING, + "queue \"%s\": queue.discardMark " + "is set quite low at %d. You should only set it below " + "80%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iDiscardMrk, goodval); + } + + if(pThis->pszFilePrefix != NULL) { /* This means we have a potential DA queue */ + if(pThis->iFullDlyMrk != -1 && pThis->iFullDlyMrk < pThis->iHighWtrMrk) { + LogMsg(0, RS_RET_CONF_WRN_FULLDLY_BELOW_HIGHWTR, LOG_WARNING, + "queue \"%s\": queue.fullDelayMark " + "is set below high water mark. This will result in DA mode " + " NOT being activated for full delayable messages: In many " + "cases this is a configuration error, please check if this " + "is really what you want", + obj.GetName((obj_t*) pThis)); + } + } + + /* now come parameter corrections and defaults */ + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) { + pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; + if(pThis->iHighWtrMrk == 0) { /* guard against very low max queue sizes! */ + pThis->iHighWtrMrk = pThis->iMaxQueueSize; + } + } + if( pThis->iLowWtrMrk < 2 + || pThis->iLowWtrMrk > pThis->iMaxQueueSize + || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) { + pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70; + if(pThis->iLowWtrMrk == 0) { + pThis->iLowWtrMrk = 1; + } + } + + if((pThis->iMinMsgsPerWrkr < 1 + || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize) ) { + pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads; + } + + if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) { + pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; + if(pThis->iFullDlyMrk == 0) { + pThis->iFullDlyMrk = + (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1; + } + } + + if(pThis->iLightDlyMrk == 0) { + pThis->iLightDlyMrk = pThis->iMaxQueueSize; + } + + if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) { + pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70; + if(pThis->iLightDlyMrk == 0) { + pThis->iLightDlyMrk = + (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1; + } + } + + if(pThis->iDiscardMrk < 1 || pThis->iDiscardMrk > pThis->iMaxQueueSize) { + pThis->iDiscardMrk = (pThis->iMaxQueueSize / 100) * 98; + if(pThis->iDiscardMrk == 0) { + /* for very small queues, we disable this by default */ + pThis->iDiscardMrk = pThis->iMaxQueueSize; + } + } + + if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) { + pThis->iDeqBatchSize = pThis->iMaxQueueSize; + } +} + +/* apply all params from param block to queue. Must be called before + * finalizing. This supports the v6 config system. Defaults were already + * set during queue creation. The pvals object is destructed by this + * function. + */ +rsRetVal +qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst) +{ + int i; + struct cnfparamvals *pvals; + int n_params_set = 0; + DEFiRet; + + pvals = nvlstGetParams(lst, &pblk, NULL); + if(pvals == NULL) { + parser_errmsg("error processing queue config parameters"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + if(Debug) { + dbgprintf("queue param blk:\n"); + cnfparamsPrint(&pblk, pvals); + } + for(i = 0 ; i < pblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + n_params_set++; + if(!strcmp(pblk.descr[i].name, "queue.filename")) { + pThis->pszFilePrefix = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr); + } else if(!strcmp(pblk.descr[i].name, "queue.cry.provider")) { + pThis->cryprovName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(pblk.descr[i].name, "queue.spooldirectory")) { + free(pThis->pszSpoolDir); + pThis->pszSpoolDir = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + pThis->lenSpoolDir = es_strlen(pvals[i].val.d.estr); + if(pThis->pszSpoolDir[pThis->lenSpoolDir-1] == '/') { + pThis->pszSpoolDir[pThis->lenSpoolDir-1] = '\0'; + --pThis->lenSpoolDir; + parser_errmsg("queue.spooldirectory must not end with '/', " + "corrected to '%s'", pThis->pszSpoolDir); + } + } else if(!strcmp(pblk.descr[i].name, "queue.size")) { + if(pvals[i].val.d.n > 0x7fffffff) { + parser_warnmsg("queue.size higher than maximum (2147483647) - " + "corrected to maximum"); + pvals[i].val.d.n = 0x7fffffff; + } else if(pvals[i].val.d.n > OVERSIZE_QUEUE_WATERMARK) { + parser_warnmsg("queue.size=%d is very large - is this " + "really intended? More info at " + "https://www.rsyslog.com/avoid-overly-large-in-memory-queues/", + (int) pvals[i].val.d.n); + } + pThis->iMaxQueueSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) { + pThis->iDeqBatchSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.mindequeuebatchsize")) { + pThis->iMinDeqBatchSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.mindequeuebatchsize.timeout")) { + pThis->toMinDeqBatchSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.maxdiskspace")) { + pThis->sizeOnDiskMax = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.highwatermark")) { + pThis->iHighWtrMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.lowwatermark")) { + pThis->iLowWtrMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.fulldelaymark")) { + pThis->iFullDlyMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.lightdelaymark")) { + pThis->iLightDlyMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.discardmark")) { + pThis->iDiscardMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.discardseverity")) { + pThis->iDiscardSeverity = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.checkpointinterval")) { + pThis->iPersistUpdCnt = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.syncqueuefiles")) { + pThis->bSyncQueueFiles = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.type")) { + pThis->qType = (queueType_t) pvals[i].val.d.n; + if(pThis->qType == QUEUETYPE_DIRECT) { + /* if we have a direct queue, we mimic this param was not set. + * Our prime intent is to make sure we detect when "real" params + * are set on a direct queue, and the type setting is obviously + * not relevant here. + */ + n_params_set--; + } + } else if(!strcmp(pblk.descr[i].name, "queue.workerthreads")) { + pThis->iNumWorkerThreads = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutshutdown")) { + pThis->toQShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutactioncompletion")) { + pThis->toActShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutenqueue")) { + pThis->toEnq = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutworkerthreadshutdown")) { + pThis->toWrkShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.workerthreadminimummessages")) { + pThis->iMinMsgsPerWrkr = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.maxfilesize")) { + pThis->iMaxFileSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.saveonshutdown")) { + pThis->bSaveOnShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.dequeueslowdown")) { + pThis->iDeqSlowdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.dequeuetimebegin")) { + pThis->iDeqtWinFromHr = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.dequeuetimeend")) { + pThis->iDeqtWinToHr = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.samplinginterval")) { + pThis->iSmpInterval = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.takeflowctlfrommsg")) { + pThis->takeFlowCtlFromMsg = pvals[i].val.d.n; + } else { + DBGPRINTF("queue: program error, non-handled " + "param '%s'\n", pblk.descr[i].name); + } + } + + checkUniqueDiskFile(pThis); + + if(pThis->qType == QUEUETYPE_DIRECT) { + if(n_params_set > 0) { + LogMsg(0, RS_RET_OK, LOG_WARNING, "warning on queue '%s': " + "queue is in direct mode, but parameters have been set. " + "These PARAMETERS cannot be applied and WILL BE IGNORED.", + obj.GetName((obj_t*) pThis)); + } + } else if(pThis->qType == QUEUETYPE_DISK) { + if(pThis->pszFilePrefix == NULL) { + LogError(0, RS_RET_QUEUE_DISK_NO_FN, "error on queue '%s', disk mode selected, but " + "no queue file name given; queue type changed to 'linkedList'", + obj.GetName((obj_t*) pThis)); + pThis->qType = QUEUETYPE_LINKEDLIST; + } + } + + if(pThis->pszFilePrefix == NULL && pThis->cryprovName != NULL) { + LogError(0, RS_RET_QUEUE_CRY_DISK_ONLY, "error on queue '%s', crypto provider can " + "only be set for disk or disk assisted queue - ignored", + obj.GetName((obj_t*) pThis)); + free(pThis->cryprovName); + pThis->cryprovName = NULL; + } + + if(pThis->cryprovName != NULL) { + initCryprov(pThis, lst); + } + + cnfparamvalsDestruct(pvals, &pblk); +finalize_it: + RETiRet; +} + +/* return 1 if the content of two qqueue_t structs equal */ +int +queuesEqual(qqueue_t *pOld, qqueue_t *pNew) +{ + return ( + NUM_EQUALS(qType) && + NUM_EQUALS(iMaxQueueSize) && + NUM_EQUALS(iDeqBatchSize) && + NUM_EQUALS(iMinDeqBatchSize) && + NUM_EQUALS(toMinDeqBatchSize) && + NUM_EQUALS(sizeOnDiskMax) && + NUM_EQUALS(iHighWtrMrk) && + NUM_EQUALS(iLowWtrMrk) && + NUM_EQUALS(iFullDlyMrk) && + NUM_EQUALS(iLightDlyMrk) && + NUM_EQUALS(iDiscardMrk) && + NUM_EQUALS(iDiscardSeverity) && + NUM_EQUALS(iPersistUpdCnt) && + NUM_EQUALS(bSyncQueueFiles) && + NUM_EQUALS(iNumWorkerThreads) && + NUM_EQUALS(toQShutdown) && + NUM_EQUALS(toActShutdown) && + NUM_EQUALS(toEnq) && + NUM_EQUALS(toWrkShutdown) && + NUM_EQUALS(iMinMsgsPerWrkr) && + NUM_EQUALS(iMaxFileSize) && + NUM_EQUALS(bSaveOnShutdown) && + NUM_EQUALS(iDeqSlowdown) && + NUM_EQUALS(iDeqtWinFromHr) && + NUM_EQUALS(iDeqtWinToHr) && + NUM_EQUALS(iSmpInterval) && + NUM_EQUALS(takeFlowCtlFromMsg) && + USTR_EQUALS(pszFilePrefix) && + USTR_EQUALS(cryprovName) + ); +} + + +/* some simple object access methods */ +DEFpropSetMeth(qqueue, bSyncQueueFiles, int) +DEFpropSetMeth(qqueue, iPersistUpdCnt, int) +DEFpropSetMeth(qqueue, iDeqtWinFromHr, int) +DEFpropSetMeth(qqueue, iDeqtWinToHr, int) +DEFpropSetMeth(qqueue, toQShutdown, long) +DEFpropSetMeth(qqueue, toActShutdown, long) +DEFpropSetMeth(qqueue, toWrkShutdown, long) +DEFpropSetMeth(qqueue, toEnq, long) +DEFpropSetMeth(qqueue, iHighWtrMrk, int) +DEFpropSetMeth(qqueue, iLowWtrMrk, int) +DEFpropSetMeth(qqueue, iDiscardMrk, int) +DEFpropSetMeth(qqueue, iDiscardSeverity, int) +DEFpropSetMeth(qqueue, iLightDlyMrk, int) +DEFpropSetMeth(qqueue, iNumWorkerThreads, int) +DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) +DEFpropSetMeth(qqueue, bSaveOnShutdown, int) +DEFpropSetMeth(qqueue, pAction, action_t*) +DEFpropSetMeth(qqueue, iDeqSlowdown, int) +DEFpropSetMeth(qqueue, iDeqBatchSize, int) +DEFpropSetMeth(qqueue, iMinDeqBatchSize, int) +DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) +DEFpropSetMeth(qqueue, iSmpInterval, int) + + +/* This function can be used as a generic way to set properties. Only the subset + * of properties required to read persisted property bags is supported. This + * functions shall only be called by the property bag reader, thus it is static. + * rgerhards, 2008-01-11 + */ +#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1) +static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pProp != NULL); + + if(isProp("iQueueSize")) { + pThis->iQueueSize = pProp->val.num; +# ifdef ENABLE_IMDIAG + iOverallQueueSize += pThis->iQueueSize; +# endif + } else if(isProp("tVars.disk.sizeOnDisk")) { + pThis->tVars.disk.sizeOnDisk = pProp->val.num; + } else if(isProp("qType")) { + if(pThis->qType != pProp->val.num) + ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH); + } + +finalize_it: + RETiRet; +} +#undef isProp + +/* dummy */ +static rsRetVal qqueueQueryInterface(interface_t __attribute__((unused)) *i) { return RS_RET_NOT_IMPLEMENTED; } + +/* Initialize the stream class. Must be called as the very first method + * before anything else is called inside this class. + * rgerhards, 2008-01-09 + */ +BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE) + /* request objects we use */ + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(strm, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + + /* now set our own handlers */ + OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty); +ENDObjClassInit(qqueue) |