diff options
Diffstat (limited to 'runtime/wti.c')
-rw-r--r-- | runtime/wti.c | 573 |
1 files changed, 573 insertions, 0 deletions
diff --git a/runtime/wti.c b/runtime/wti.c new file mode 100644 index 0000000..96bae4d --- /dev/null +++ b/runtime/wti.c @@ -0,0 +1,573 @@ +/* wti.c + * + * This file implements the worker thread instance (wti) class. + * + * File begun on 2008-01-20 by RGerhards based on functions from the + * previous queue object class (the wti functions have been extracted) + * + * There is some in-depth documentation available in doc/dev_queue.html + * (and in the web doc set on https://www.rsyslog.com/doc/). Be sure to read it + * if you are getting aquainted to the object. + * + * Copyright 2008-2019 Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <pthread.h> +#include <errno.h> + +#include "rsyslog.h" +#include "stringbuf.h" +#include "srUtils.h" +#include "errmsg.h" +#include "wtp.h" +#include "wti.h" +#include "obj.h" +#include "glbl.h" +#include "action.h" +#include "atomic.h" +#include "rsconf.h" + +/* static data */ +DEFobjStaticHelpers +DEFobjCurrIf(glbl) + +pthread_key_t thrd_wti_key; + + +/* forward-definitions */ + +/* methods */ + +/* get the header for debug messages + * The caller must NOT free or otherwise modify the returned string! + */ +uchar * ATTR_NONNULL() +wtiGetDbgHdr(const wti_t *const pThis) +{ + ISOBJ_TYPE_assert(pThis, wti); + + if(pThis->pszDbgHdr == NULL) + return (uchar*) "wti"; /* should not normally happen */ + else + return pThis->pszDbgHdr; +} + + +/* return the current worker processing state. For the sake of + * simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17 + */ +int ATTR_NONNULL() +wtiGetState(wti_t *pThis) +{ + return ATOMIC_FETCH_32BIT(&pThis->bIsRunning, &pThis->mutIsRunning); +} + +/* join terminated worker thread + * This may be called in any thread state, it will be a NOP if the + * thread is not to join. + */ +void ATTR_NONNULL() +wtiJoinThrd(wti_t *const pThis) +{ + int r; + ISOBJ_TYPE_assert(pThis, wti); + if(wtiGetState(pThis) == WRKTHRD_WAIT_JOIN) { + DBGPRINTF("%s: joining terminated worker\n", wtiGetDbgHdr(pThis)); + if((r = pthread_join(pThis->thrdID, NULL)) != 0) { + LogMsg(r, RS_RET_INTERNAL_ERROR, LOG_WARNING, + "rsyslog bug? wti cannot join terminated wrkr"); + } + DBGPRINTF("%s: worker fully terminated\n", wtiGetDbgHdr(pThis)); + wtiSetState(pThis, WRKTHRD_STOPPED); + if(dbgTimeoutToStderr) { + fprintf(stderr, "rsyslog debug: %s: thread joined\n", + wtiGetDbgHdr(pThis)); + } + } +} + +/* Set this thread to "always running" state (can not be unset) + * rgerhards, 2009-07-20 + */ +rsRetVal ATTR_NONNULL() +wtiSetAlwaysRunning(wti_t *pThis) +{ + ISOBJ_TYPE_assert(pThis, wti); + pThis->bAlwaysRunning = RSTRUE; + return RS_RET_OK; +} + +/* Set status (thread is running or not), actually an property of + * use for wtp, but we need to have it per thread instance (thus it + * is inside wti). -- rgerhards, 2009-07-17 + */ +rsRetVal ATTR_NONNULL() +wtiSetState(wti_t *pThis, const int newVal) +{ + ISOBJ_TYPE_assert(pThis, wti); + if(newVal == WRKTHRD_STOPPED) { + ATOMIC_STORE_0_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning); + } else { + ATOMIC_OR_INT_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning, newVal); + } + return RS_RET_OK; +} + + +/* advise all workers to start by interrupting them. That should unblock all srSleep() + * calls. + */ +rsRetVal +wtiWakeupThrd(wti_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, wti); + + if(wtiGetState(pThis)) { + /* we first try the cooperative "cancel" interface */ + pthread_kill(pThis->thrdID, SIGTTIN); + DBGPRINTF("sent SIGTTIN to worker thread %p\n", (void*) pThis->thrdID); + } + + RETiRet; +} + + +/* Cancel the thread. If the thread is not running. But it is save and legal to + * call wtiCancelThrd() in such situations. This function only returns when the + * thread has terminated. Else we may get race conditions all over the code... + * Note that when waiting for the thread to terminate, we do a busy wait, checking + * progress every 10ms. It is very unlikely that we will ever cancel a thread + * and, if so, it will only happen at the end of the rsyslog run. So doing this + * kind of non-optimal wait is considered preferable over using condition variables. + * rgerhards, 2008-02-26 + */ +rsRetVal ATTR_NONNULL() +wtiCancelThrd(wti_t *pThis, const uchar *const cancelobj) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, wti); + + wtiJoinThrd(pThis); + if(wtiGetState(pThis) != WRKTHRD_STOPPED) { + LogMsg(0, RS_RET_ERR, LOG_WARNING, "%s: need to do cooperative cancellation " + "- some data may be lost, increase timeout?", cancelobj); + /* we first try the cooperative "cancel" interface */ + pthread_kill(pThis->thrdID, SIGTTIN); + DBGPRINTF("sent SIGTTIN to worker thread %p, giving it a chance to terminate\n", + (void *) pThis->thrdID); + srSleep(0, 50000); + wtiJoinThrd(pThis); + } + + if(wtiGetState(pThis) != WRKTHRD_STOPPED) { + LogMsg(0, RS_RET_ERR, LOG_WARNING, "%s: need to do hard cancellation", cancelobj); + if(dbgTimeoutToStderr) { + fprintf(stderr, "rsyslog debug: %s: need to do hard cancellation\n", + cancelobj); + } + pthread_cancel(pThis->thrdID); + pthread_kill(pThis->thrdID, SIGTTIN); + DBGPRINTF("cooperative worker termination failed, using cancellation...\n"); + DBGOPRINT((obj_t*) pThis, "canceling worker thread\n"); + pthread_cancel(pThis->thrdID); + /* now wait until the thread terminates... */ + while(wtiGetState(pThis) != WRKTHRD_STOPPED && wtiGetState(pThis) != WRKTHRD_WAIT_JOIN) { + DBGOPRINT((obj_t*) pThis, "waiting on termination, state %d\n", wtiGetState(pThis)); + srSleep(0, 10000); + } + } + + wtiJoinThrd(pThis); + RETiRet; +} + +/* note: this function is only called once in action.c */ +rsRetVal +wtiNewIParam(wti_t *const pWti, action_t *const pAction, actWrkrIParams_t **piparams) +{ + actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + actWrkrIParams_t *iparams; + int newMax; + DEFiRet; + + if(wrkrInfo->p.tx.currIParam == wrkrInfo->p.tx.maxIParams) { + /* we need to extend */ + newMax = (wrkrInfo->p.tx.maxIParams == 0) ? CONF_IPARAMS_BUFSIZE + : 2 * wrkrInfo->p.tx.maxIParams; + CHKmalloc(iparams = realloc(wrkrInfo->p.tx.iparams, + sizeof(actWrkrIParams_t) * pAction->iNumTpls * newMax)); + memset(iparams + (wrkrInfo->p.tx.currIParam * pAction->iNumTpls), 0, + sizeof(actWrkrIParams_t) * pAction->iNumTpls * (newMax - wrkrInfo->p.tx.maxIParams)); + wrkrInfo->p.tx.iparams = iparams; + wrkrInfo->p.tx.maxIParams = newMax; + } + *piparams = wrkrInfo->p.tx.iparams + wrkrInfo->p.tx.currIParam * pAction->iNumTpls; + ++wrkrInfo->p.tx.currIParam; + +finalize_it: + RETiRet; +} + + + +/* Destructor */ +BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDestruct(wti) + if(wtiGetState(pThis) != WRKTHRD_STOPPED) { + DBGPRINTF("%s: rsyslog bug: worker not stopped during shutdown\n", + wtiGetDbgHdr(pThis)); + if(dbgTimeoutToStderr) { + fprintf(stderr, "RSYSLOG BUG: %s: worker not stopped during shutdown\n", + wtiGetDbgHdr(pThis)); + } else { + assert(wtiGetState(pThis) == WRKTHRD_STOPPED); + } + } + /* actual destruction */ + batchFree(&pThis->batch); + free(pThis->actWrkrInfo); + pthread_cond_destroy(&pThis->pcondBusy); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); + free(pThis->pszDbgHdr); +ENDobjDestruct(wti) + + +/* Standard-Constructor for the wti object + */ +BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ + INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning); + pthread_cond_init(&pThis->pcondBusy, NULL); +ENDobjConstruct(wti) + + +/* Construction finalizer + * rgerhards, 2008-01-17 + */ +rsRetVal +wtiConstructFinalize(wti_t *pThis) +{ + DEFiRet; + int iDeqBatchSize; + + ISOBJ_TYPE_assert(pThis, wti); + + DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n", + wtiGetDbgHdr(pThis), runConf->actions.iActionNbr); + + /* initialize our thread instance descriptor (no concurrency here) */ + pThis->bIsRunning = WRKTHRD_STOPPED; + + /* must use calloc as we need zero-init */ + CHKmalloc(pThis->actWrkrInfo = calloc(runConf->actions.iActionNbr, sizeof(actWrkrInfo_t))); + + if(pThis->pWtp == NULL) { + dbgprintf("wtiConstructFinalize: pWtp not set, this may be intentional\n"); + FINALIZE; + } + + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ + CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); + CHKiRet(batchInit(&pThis->batch, iDeqBatchSize)); + +finalize_it: + RETiRet; +} + + +/* cancellation cleanup handler for queueWorker () + * Most importantly, it must bring back the batch into a consistent state. + * Keep in mind that cancellation is disabled if we run into + * the cancel cleanup handler (and have been cancelled). + * rgerhards, 2008-01-16 + */ +static void +wtiWorkerCancelCleanup(void *arg) +{ + wti_t *pThis = (wti_t*) arg; + wtp_t *pWtp; + + ISOBJ_TYPE_assert(pThis, wti); + pWtp = pThis->pWtp; + ISOBJ_TYPE_assert(pWtp, wtp); + + DBGPRINTF("%s: cancellation cleanup handler called.\n", wtiGetDbgHdr(pThis)); + pWtp->pfObjProcessed(pWtp->pUsr, pThis); + DBGPRINTF("%s: done cancellation cleanup handler.\n", wtiGetDbgHdr(pThis)); + +} + + +/* wait for queue to become non-empty or timeout + * this is introduced as helper to support queue minimum batch sizes, but may + * also be used for other cases. This function waits until the queue is non-empty + * or a timeout occurs. The timeout must be passed in as absolute value. + * @returns 0 if timeout occurs (queue still empty), something else otherwise + */ +int ATTR_NONNULL() +wtiWaitNonEmpty(wti_t *const pThis, const struct timespec timeout) +{ + wtp_t *__restrict__ const pWtp = pThis->pWtp; + int r; + + DBGOPRINT((obj_t*) pThis, "waiting on queue to become non-empty\n"); + if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &timeout) != 0) { + r = 0; + } else { + r = 1; + } + DBGOPRINT((obj_t*) pThis, "waited on queue to become non-empty, result %d\n", r); + return r; +} + + +/* wait for queue to become non-empty or timeout + * helper to wtiWorker. Note the the predicate is + * re-tested by the caller, so it is OK to NOT do it here. + * rgerhards, 2009-05-20 + */ +static void ATTR_NONNULL() +doIdleProcessing(wti_t *const pThis, wtp_t *const pWtp, int *const pbInactivityTOOccurred) +{ + struct timespec t; + + DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); + + if(pThis->bAlwaysRunning) { + /* never shut down any started worker */ + d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr); + } else { + timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ + if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &t) != 0) { + DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); + *pbInactivityTOOccurred = 1; /* indicate we had a timeout */ + } + } + DBGOPRINT((obj_t*) pThis, "worker awoke from idle processing\n"); +} + + +/* generic worker thread framework. Note that we prohibit cancellation + * during almost all times, because it can have very undesired side effects. + * However, we may need to cancel a thread if the consumer blocks for too + * long (during shutdown). So what we do is block cancellation, and every + * consumer must enable it during the periods where it is safe. + */ +PRAGMA_DIAGNOSTIC_PUSH +PRAGMA_IGNORE_Wempty_body +rsRetVal +wtiWorker(wti_t *__restrict__ const pThis) +{ + wtp_t *__restrict__ const pWtp = pThis->pWtp; /* our worker thread pool -- shortcut */ + action_t *__restrict__ pAction; + rsRetVal localRet; + rsRetVal terminateRet; + actWrkrInfo_t *__restrict__ wrkrInfo; + int iCancelStateSave; + int i, j, k; + DEFiRet; + + dbgSetThrdName(pThis->pszDbgHdr); + pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); + int bInactivityTOOccurred = 0; + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + DBGPRINTF("wti %p: worker starting\n", pThis); + /* now we have our identity, on to real processing */ + + /* note: in this loop, the mutex is "never" unlocked. Of course, + * this is not true: it actually is unlocked when the actual processing + * is done, as part of pWtp->pfDoWork() processing. Note that this + * function is required to re-lock it when done. We cannot do the + * lock/unlock here ourselfs, as pfDoWork() needs to access queue + * structures itself. + * The same goes for pfRateLimiter(). While we could unlock/lock when + * we call it, in practice the function is often called without any + * ratelimiting actually done. Only the rate limiter itself knows + * that. As such, it needs to bear the burden of doing the locking + * when required. -- rgerhards, 2013-11-20 + */ + d_pthread_mutex_lock(pWtp->pmutUsr); + while(1) { /* loop will be broken below */ + if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ + pWtp->pfRateLimiter(pWtp->pUsr); + } + + /* first check if we are in shutdown process (but evaluate a bit later) */ + terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); + if(terminateRet == RS_RET_TERMINATE_NOW) { + /* we now need to free the old batch */ + localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); + DBGOPRINT((obj_t*) pThis, "terminating worker because of " + "TERMINATE_NOW mode, del iRet %d\n", localRet); + break; + } + + /* try to execute and process whatever we have */ + localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); + + if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) { + break; /* end of loop */ + } else if(localRet == RS_RET_IDLE) { + if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccurred) { + DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, " + "bInactivityTOOccurred=%d\n", terminateRet, bInactivityTOOccurred); + break; /* end of loop */ + } + doIdleProcessing(pThis, pWtp, &bInactivityTOOccurred); + continue; /* request next iteration */ + } + + bInactivityTOOccurred = 0; /* reset for next run */ + } + + d_pthread_mutex_unlock(pWtp->pmutUsr); + + DBGPRINTF("DDDD: wti %p: worker cleanup action instances\n", pThis); + for(i = 0 ; i < runConf->actions.iActionNbr ; ++i) { + wrkrInfo = &(pThis->actWrkrInfo[i]); + dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, wrkrInfo->actWrkrData); + if(wrkrInfo->actWrkrData != NULL) { + pAction = wrkrInfo->pAction; + actionRemoveWorker(pAction, wrkrInfo->actWrkrData); + pAction->pMod->mod.om.freeWrkrInstance(wrkrInfo->actWrkrData); + if(pAction->isTransactional) { + /* free iparam "cache" - we need to go through to max! */ + for(j = 0 ; j < wrkrInfo->p.tx.maxIParams ; ++j) { + for(k = 0 ; k < pAction->iNumTpls ; ++k) { + free(actParam(wrkrInfo->p.tx.iparams, + pAction->iNumTpls, j, k).param); + } + } + free(wrkrInfo->p.tx.iparams); + wrkrInfo->p.tx.iparams = NULL; + wrkrInfo->p.tx.currIParam = 0; + wrkrInfo->p.tx.maxIParams = 0; + } else { + releaseDoActionParams(pAction, pThis, 1); + } + wrkrInfo->actWrkrData = NULL; /* re-init for next activation */ + } + } + + /* indicate termination */ + pthread_cleanup_pop(0); /* remove cleanup handler */ + pthread_setcancelstate(iCancelStateSave, NULL); + dbgprintf("wti %p: exiting\n", pThis); + + RETiRet; +} +PRAGMA_DIAGNOSTIC_POP + + +/* some simple object access methods */ +DEFpropSetMeth(wti, pWtp, wtp_t*) + +/* set the debug header message + * The passed-in string is duplicated. So if the caller does not need + * it any longer, it must free it. Must be called only before object is finalized. + * rgerhards, 2008-01-09 + */ +rsRetVal +wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, const size_t lenMsg) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, wti); + assert(pszMsg != NULL); + + if(lenMsg < 1) + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + + if(pThis->pszDbgHdr != NULL) { + free(pThis->pszDbgHdr); + } + + if((pThis->pszDbgHdr = malloc(lenMsg + 1)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + + memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */ + +finalize_it: + RETiRet; +} + + +/* This function returns (and creates if necessary) a dummy wti suitable + * for use by the rule engine. It is intended to be used for direct-mode + * main queues (folks, don't do that!). Once created, data is stored in + * thread-specific storage. + * Note: we do NOT do error checking -- if this functions fails, all the + * rest will fail as well... (also, it will only fail under OOM, so...). + * Memleak: we leak pWti's when run in direct mode. However, this is only + * a cosmetic leak, as we need them until all inputs are terminated, + * what means essentially until rsyslog itself is terminated. So we + * don't care -- it's just not nice in valgrind, but that's it. + */ +wti_t * +wtiGetDummy(void) +{ + wti_t *pWti; + + pWti = (wti_t*) pthread_getspecific(thrd_wti_key); + if(pWti == NULL) { + wtiConstruct(&pWti); + if(pWti != NULL) + wtiConstructFinalize(pWti); + if(pthread_setspecific(thrd_wti_key, pWti) != 0) { + DBGPRINTF("wtiGetDummy: error setspecific thrd_wti_key\n"); + } + } + return pWti; +} + +/* dummy */ +static rsRetVal wtiQueryInterface(interface_t __attribute__((unused)) *i) { return RS_RET_NOT_IMPLEMENTED; } + +/* exit our class + */ +BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */ +CODESTARTObjClassExit(nsdsel_gtls) + /* release objects we no longer need */ + objRelease(glbl, CORE_COMPONENT); + pthread_key_delete(thrd_wti_key); +ENDObjClassExit(wti) + + +/* Initialize the wti class. Must be called as the very first method + * before anything else is called inside this class. + * rgerhards, 2008-01-09 + */ +BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */ + int r; + /* request objects we use */ + CHKiRet(objUse(glbl, CORE_COMPONENT)); + r = pthread_key_create(&thrd_wti_key, NULL); + if(r != 0) { + dbgprintf("wti.c: pthread_key_create failed\n"); + ABORT_FINALIZE(RS_RET_ERR); + } +ENDObjClassInit(wti) |