summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c573
1 files changed, 573 insertions, 0 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
new file mode 100644
index 0000000..96bae4d
--- /dev/null
+++ b/runtime/wti.c
@@ -0,0 +1,573 @@
+/* wti.c
+ *
+ * This file implements the worker thread instance (wti) class.
+ *
+ * File begun on 2008-01-20 by RGerhards based on functions from the
+ * previous queue object class (the wti functions have been extracted)
+ *
+ * There is some in-depth documentation available in doc/dev_queue.html
+ * (and in the web doc set on https://www.rsyslog.com/doc/). Be sure to read it
+ * if you are getting aquainted to the object.
+ *
+ * Copyright 2008-2019 Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <pthread.h>
+#include <errno.h>
+
+#include "rsyslog.h"
+#include "stringbuf.h"
+#include "srUtils.h"
+#include "errmsg.h"
+#include "wtp.h"
+#include "wti.h"
+#include "obj.h"
+#include "glbl.h"
+#include "action.h"
+#include "atomic.h"
+#include "rsconf.h"
+
+/* static data */
+DEFobjStaticHelpers
+DEFobjCurrIf(glbl)
+
+pthread_key_t thrd_wti_key;
+
+
+/* forward-definitions */
+
+/* methods */
+
+/* get the header for debug messages
+ * The caller must NOT free or otherwise modify the returned string!
+ */
+uchar * ATTR_NONNULL()
+wtiGetDbgHdr(const wti_t *const pThis)
+{
+ ISOBJ_TYPE_assert(pThis, wti);
+
+ if(pThis->pszDbgHdr == NULL)
+ return (uchar*) "wti"; /* should not normally happen */
+ else
+ return pThis->pszDbgHdr;
+}
+
+
+/* return the current worker processing state. For the sake of
+ * simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17
+ */
+int ATTR_NONNULL()
+wtiGetState(wti_t *pThis)
+{
+ return ATOMIC_FETCH_32BIT(&pThis->bIsRunning, &pThis->mutIsRunning);
+}
+
+/* join terminated worker thread
+ * This may be called in any thread state, it will be a NOP if the
+ * thread is not to join.
+ */
+void ATTR_NONNULL()
+wtiJoinThrd(wti_t *const pThis)
+{
+ int r;
+ ISOBJ_TYPE_assert(pThis, wti);
+ if(wtiGetState(pThis) == WRKTHRD_WAIT_JOIN) {
+ DBGPRINTF("%s: joining terminated worker\n", wtiGetDbgHdr(pThis));
+ if((r = pthread_join(pThis->thrdID, NULL)) != 0) {
+ LogMsg(r, RS_RET_INTERNAL_ERROR, LOG_WARNING,
+ "rsyslog bug? wti cannot join terminated wrkr");
+ }
+ DBGPRINTF("%s: worker fully terminated\n", wtiGetDbgHdr(pThis));
+ wtiSetState(pThis, WRKTHRD_STOPPED);
+ if(dbgTimeoutToStderr) {
+ fprintf(stderr, "rsyslog debug: %s: thread joined\n",
+ wtiGetDbgHdr(pThis));
+ }
+ }
+}
+
+/* Set this thread to "always running" state (can not be unset)
+ * rgerhards, 2009-07-20
+ */
+rsRetVal ATTR_NONNULL()
+wtiSetAlwaysRunning(wti_t *pThis)
+{
+ ISOBJ_TYPE_assert(pThis, wti);
+ pThis->bAlwaysRunning = RSTRUE;
+ return RS_RET_OK;
+}
+
+/* Set status (thread is running or not), actually an property of
+ * use for wtp, but we need to have it per thread instance (thus it
+ * is inside wti). -- rgerhards, 2009-07-17
+ */
+rsRetVal ATTR_NONNULL()
+wtiSetState(wti_t *pThis, const int newVal)
+{
+ ISOBJ_TYPE_assert(pThis, wti);
+ if(newVal == WRKTHRD_STOPPED) {
+ ATOMIC_STORE_0_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning);
+ } else {
+ ATOMIC_OR_INT_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning, newVal);
+ }
+ return RS_RET_OK;
+}
+
+
+/* advise all workers to start by interrupting them. That should unblock all srSleep()
+ * calls.
+ */
+rsRetVal
+wtiWakeupThrd(wti_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+ if(wtiGetState(pThis)) {
+ /* we first try the cooperative "cancel" interface */
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ DBGPRINTF("sent SIGTTIN to worker thread %p\n", (void*) pThis->thrdID);
+ }
+
+ RETiRet;
+}
+
+
+/* Cancel the thread. If the thread is not running. But it is save and legal to
+ * call wtiCancelThrd() in such situations. This function only returns when the
+ * thread has terminated. Else we may get race conditions all over the code...
+ * Note that when waiting for the thread to terminate, we do a busy wait, checking
+ * progress every 10ms. It is very unlikely that we will ever cancel a thread
+ * and, if so, it will only happen at the end of the rsyslog run. So doing this
+ * kind of non-optimal wait is considered preferable over using condition variables.
+ * rgerhards, 2008-02-26
+ */
+rsRetVal ATTR_NONNULL()
+wtiCancelThrd(wti_t *pThis, const uchar *const cancelobj)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+ wtiJoinThrd(pThis);
+ if(wtiGetState(pThis) != WRKTHRD_STOPPED) {
+ LogMsg(0, RS_RET_ERR, LOG_WARNING, "%s: need to do cooperative cancellation "
+ "- some data may be lost, increase timeout?", cancelobj);
+ /* we first try the cooperative "cancel" interface */
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ DBGPRINTF("sent SIGTTIN to worker thread %p, giving it a chance to terminate\n",
+ (void *) pThis->thrdID);
+ srSleep(0, 50000);
+ wtiJoinThrd(pThis);
+ }
+
+ if(wtiGetState(pThis) != WRKTHRD_STOPPED) {
+ LogMsg(0, RS_RET_ERR, LOG_WARNING, "%s: need to do hard cancellation", cancelobj);
+ if(dbgTimeoutToStderr) {
+ fprintf(stderr, "rsyslog debug: %s: need to do hard cancellation\n",
+ cancelobj);
+ }
+ pthread_cancel(pThis->thrdID);
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ DBGPRINTF("cooperative worker termination failed, using cancellation...\n");
+ DBGOPRINT((obj_t*) pThis, "canceling worker thread\n");
+ pthread_cancel(pThis->thrdID);
+ /* now wait until the thread terminates... */
+ while(wtiGetState(pThis) != WRKTHRD_STOPPED && wtiGetState(pThis) != WRKTHRD_WAIT_JOIN) {
+ DBGOPRINT((obj_t*) pThis, "waiting on termination, state %d\n", wtiGetState(pThis));
+ srSleep(0, 10000);
+ }
+ }
+
+ wtiJoinThrd(pThis);
+ RETiRet;
+}
+
+/* note: this function is only called once in action.c */
+rsRetVal
+wtiNewIParam(wti_t *const pWti, action_t *const pAction, actWrkrIParams_t **piparams)
+{
+ actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
+ actWrkrIParams_t *iparams;
+ int newMax;
+ DEFiRet;
+
+ if(wrkrInfo->p.tx.currIParam == wrkrInfo->p.tx.maxIParams) {
+ /* we need to extend */
+ newMax = (wrkrInfo->p.tx.maxIParams == 0) ? CONF_IPARAMS_BUFSIZE
+ : 2 * wrkrInfo->p.tx.maxIParams;
+ CHKmalloc(iparams = realloc(wrkrInfo->p.tx.iparams,
+ sizeof(actWrkrIParams_t) * pAction->iNumTpls * newMax));
+ memset(iparams + (wrkrInfo->p.tx.currIParam * pAction->iNumTpls), 0,
+ sizeof(actWrkrIParams_t) * pAction->iNumTpls * (newMax - wrkrInfo->p.tx.maxIParams));
+ wrkrInfo->p.tx.iparams = iparams;
+ wrkrInfo->p.tx.maxIParams = newMax;
+ }
+ *piparams = wrkrInfo->p.tx.iparams + wrkrInfo->p.tx.currIParam * pAction->iNumTpls;
+ ++wrkrInfo->p.tx.currIParam;
+
+finalize_it:
+ RETiRet;
+}
+
+
+
+/* Destructor */
+BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(wti)
+ if(wtiGetState(pThis) != WRKTHRD_STOPPED) {
+ DBGPRINTF("%s: rsyslog bug: worker not stopped during shutdown\n",
+ wtiGetDbgHdr(pThis));
+ if(dbgTimeoutToStderr) {
+ fprintf(stderr, "RSYSLOG BUG: %s: worker not stopped during shutdown\n",
+ wtiGetDbgHdr(pThis));
+ } else {
+ assert(wtiGetState(pThis) == WRKTHRD_STOPPED);
+ }
+ }
+ /* actual destruction */
+ batchFree(&pThis->batch);
+ free(pThis->actWrkrInfo);
+ pthread_cond_destroy(&pThis->pcondBusy);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
+ free(pThis->pszDbgHdr);
+ENDobjDestruct(wti)
+
+
+/* Standard-Constructor for the wti object
+ */
+BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
+ INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
+ pthread_cond_init(&pThis->pcondBusy, NULL);
+ENDobjConstruct(wti)
+
+
+/* Construction finalizer
+ * rgerhards, 2008-01-17
+ */
+rsRetVal
+wtiConstructFinalize(wti_t *pThis)
+{
+ DEFiRet;
+ int iDeqBatchSize;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+ DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n",
+ wtiGetDbgHdr(pThis), runConf->actions.iActionNbr);
+
+ /* initialize our thread instance descriptor (no concurrency here) */
+ pThis->bIsRunning = WRKTHRD_STOPPED;
+
+ /* must use calloc as we need zero-init */
+ CHKmalloc(pThis->actWrkrInfo = calloc(runConf->actions.iActionNbr, sizeof(actWrkrInfo_t)));
+
+ if(pThis->pWtp == NULL) {
+ dbgprintf("wtiConstructFinalize: pWtp not set, this may be intentional\n");
+ FINALIZE;
+ }
+
+ /* we now alloc the array for user pointers. We obtain the max from the queue itself. */
+ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
+ CHKiRet(batchInit(&pThis->batch, iDeqBatchSize));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* cancellation cleanup handler for queueWorker ()
+ * Most importantly, it must bring back the batch into a consistent state.
+ * Keep in mind that cancellation is disabled if we run into
+ * the cancel cleanup handler (and have been cancelled).
+ * rgerhards, 2008-01-16
+ */
+static void
+wtiWorkerCancelCleanup(void *arg)
+{
+ wti_t *pThis = (wti_t*) arg;
+ wtp_t *pWtp;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+ pWtp = pThis->pWtp;
+ ISOBJ_TYPE_assert(pWtp, wtp);
+
+ DBGPRINTF("%s: cancellation cleanup handler called.\n", wtiGetDbgHdr(pThis));
+ pWtp->pfObjProcessed(pWtp->pUsr, pThis);
+ DBGPRINTF("%s: done cancellation cleanup handler.\n", wtiGetDbgHdr(pThis));
+
+}
+
+
+/* wait for queue to become non-empty or timeout
+ * this is introduced as helper to support queue minimum batch sizes, but may
+ * also be used for other cases. This function waits until the queue is non-empty
+ * or a timeout occurs. The timeout must be passed in as absolute value.
+ * @returns 0 if timeout occurs (queue still empty), something else otherwise
+ */
+int ATTR_NONNULL()
+wtiWaitNonEmpty(wti_t *const pThis, const struct timespec timeout)
+{
+ wtp_t *__restrict__ const pWtp = pThis->pWtp;
+ int r;
+
+ DBGOPRINT((obj_t*) pThis, "waiting on queue to become non-empty\n");
+ if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &timeout) != 0) {
+ r = 0;
+ } else {
+ r = 1;
+ }
+ DBGOPRINT((obj_t*) pThis, "waited on queue to become non-empty, result %d\n", r);
+ return r;
+}
+
+
+/* wait for queue to become non-empty or timeout
+ * helper to wtiWorker. Note the the predicate is
+ * re-tested by the caller, so it is OK to NOT do it here.
+ * rgerhards, 2009-05-20
+ */
+static void ATTR_NONNULL()
+doIdleProcessing(wti_t *const pThis, wtp_t *const pWtp, int *const pbInactivityTOOccurred)
+{
+ struct timespec t;
+
+ DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+
+ if(pThis->bAlwaysRunning) {
+ /* never shut down any started worker */
+ d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr);
+ } else {
+ timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
+ if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
+ *pbInactivityTOOccurred = 1; /* indicate we had a timeout */
+ }
+ }
+ DBGOPRINT((obj_t*) pThis, "worker awoke from idle processing\n");
+}
+
+
+/* generic worker thread framework. Note that we prohibit cancellation
+ * during almost all times, because it can have very undesired side effects.
+ * However, we may need to cancel a thread if the consumer blocks for too
+ * long (during shutdown). So what we do is block cancellation, and every
+ * consumer must enable it during the periods where it is safe.
+ */
+PRAGMA_DIAGNOSTIC_PUSH
+PRAGMA_IGNORE_Wempty_body
+rsRetVal
+wtiWorker(wti_t *__restrict__ const pThis)
+{
+ wtp_t *__restrict__ const pWtp = pThis->pWtp; /* our worker thread pool -- shortcut */
+ action_t *__restrict__ pAction;
+ rsRetVal localRet;
+ rsRetVal terminateRet;
+ actWrkrInfo_t *__restrict__ wrkrInfo;
+ int iCancelStateSave;
+ int i, j, k;
+ DEFiRet;
+
+ dbgSetThrdName(pThis->pszDbgHdr);
+ pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
+ int bInactivityTOOccurred = 0;
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ DBGPRINTF("wti %p: worker starting\n", pThis);
+ /* now we have our identity, on to real processing */
+
+ /* note: in this loop, the mutex is "never" unlocked. Of course,
+ * this is not true: it actually is unlocked when the actual processing
+ * is done, as part of pWtp->pfDoWork() processing. Note that this
+ * function is required to re-lock it when done. We cannot do the
+ * lock/unlock here ourselfs, as pfDoWork() needs to access queue
+ * structures itself.
+ * The same goes for pfRateLimiter(). While we could unlock/lock when
+ * we call it, in practice the function is often called without any
+ * ratelimiting actually done. Only the rate limiter itself knows
+ * that. As such, it needs to bear the burden of doing the locking
+ * when required. -- rgerhards, 2013-11-20
+ */
+ d_pthread_mutex_lock(pWtp->pmutUsr);
+ while(1) { /* loop will be broken below */
+ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
+ pWtp->pfRateLimiter(pWtp->pUsr);
+ }
+
+ /* first check if we are in shutdown process (but evaluate a bit later) */
+ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
+ if(terminateRet == RS_RET_TERMINATE_NOW) {
+ /* we now need to free the old batch */
+ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
+ DBGOPRINT((obj_t*) pThis, "terminating worker because of "
+ "TERMINATE_NOW mode, del iRet %d\n", localRet);
+ break;
+ }
+
+ /* try to execute and process whatever we have */
+ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
+
+ if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) {
+ break; /* end of loop */
+ } else if(localRet == RS_RET_IDLE) {
+ if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccurred) {
+ DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, "
+ "bInactivityTOOccurred=%d\n", terminateRet, bInactivityTOOccurred);
+ break; /* end of loop */
+ }
+ doIdleProcessing(pThis, pWtp, &bInactivityTOOccurred);
+ continue; /* request next iteration */
+ }
+
+ bInactivityTOOccurred = 0; /* reset for next run */
+ }
+
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
+
+ DBGPRINTF("DDDD: wti %p: worker cleanup action instances\n", pThis);
+ for(i = 0 ; i < runConf->actions.iActionNbr ; ++i) {
+ wrkrInfo = &(pThis->actWrkrInfo[i]);
+ dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, wrkrInfo->actWrkrData);
+ if(wrkrInfo->actWrkrData != NULL) {
+ pAction = wrkrInfo->pAction;
+ actionRemoveWorker(pAction, wrkrInfo->actWrkrData);
+ pAction->pMod->mod.om.freeWrkrInstance(wrkrInfo->actWrkrData);
+ if(pAction->isTransactional) {
+ /* free iparam "cache" - we need to go through to max! */
+ for(j = 0 ; j < wrkrInfo->p.tx.maxIParams ; ++j) {
+ for(k = 0 ; k < pAction->iNumTpls ; ++k) {
+ free(actParam(wrkrInfo->p.tx.iparams,
+ pAction->iNumTpls, j, k).param);
+ }
+ }
+ free(wrkrInfo->p.tx.iparams);
+ wrkrInfo->p.tx.iparams = NULL;
+ wrkrInfo->p.tx.currIParam = 0;
+ wrkrInfo->p.tx.maxIParams = 0;
+ } else {
+ releaseDoActionParams(pAction, pThis, 1);
+ }
+ wrkrInfo->actWrkrData = NULL; /* re-init for next activation */
+ }
+ }
+
+ /* indicate termination */
+ pthread_cleanup_pop(0); /* remove cleanup handler */
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ dbgprintf("wti %p: exiting\n", pThis);
+
+ RETiRet;
+}
+PRAGMA_DIAGNOSTIC_POP
+
+
+/* some simple object access methods */
+DEFpropSetMeth(wti, pWtp, wtp_t*)
+
+/* set the debug header message
+ * The passed-in string is duplicated. So if the caller does not need
+ * it any longer, it must free it. Must be called only before object is finalized.
+ * rgerhards, 2008-01-09
+ */
+rsRetVal
+wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, const size_t lenMsg)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+ assert(pszMsg != NULL);
+
+ if(lenMsg < 1)
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+
+ if(pThis->pszDbgHdr != NULL) {
+ free(pThis->pszDbgHdr);
+ }
+
+ if((pThis->pszDbgHdr = malloc(lenMsg + 1)) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+
+ memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function returns (and creates if necessary) a dummy wti suitable
+ * for use by the rule engine. It is intended to be used for direct-mode
+ * main queues (folks, don't do that!). Once created, data is stored in
+ * thread-specific storage.
+ * Note: we do NOT do error checking -- if this functions fails, all the
+ * rest will fail as well... (also, it will only fail under OOM, so...).
+ * Memleak: we leak pWti's when run in direct mode. However, this is only
+ * a cosmetic leak, as we need them until all inputs are terminated,
+ * what means essentially until rsyslog itself is terminated. So we
+ * don't care -- it's just not nice in valgrind, but that's it.
+ */
+wti_t *
+wtiGetDummy(void)
+{
+ wti_t *pWti;
+
+ pWti = (wti_t*) pthread_getspecific(thrd_wti_key);
+ if(pWti == NULL) {
+ wtiConstruct(&pWti);
+ if(pWti != NULL)
+ wtiConstructFinalize(pWti);
+ if(pthread_setspecific(thrd_wti_key, pWti) != 0) {
+ DBGPRINTF("wtiGetDummy: error setspecific thrd_wti_key\n");
+ }
+ }
+ return pWti;
+}
+
+/* dummy */
+static rsRetVal wtiQueryInterface(interface_t __attribute__((unused)) *i) { return RS_RET_NOT_IMPLEMENTED; }
+
+/* exit our class
+ */
+BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(nsdsel_gtls)
+ /* release objects we no longer need */
+ objRelease(glbl, CORE_COMPONENT);
+ pthread_key_delete(thrd_wti_key);
+ENDObjClassExit(wti)
+
+
+/* Initialize the wti class. Must be called as the very first method
+ * before anything else is called inside this class.
+ * rgerhards, 2008-01-09
+ */
+BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */
+ int r;
+ /* request objects we use */
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ r = pthread_key_create(&thrd_wti_key, NULL);
+ if(r != 0) {
+ dbgprintf("wti.c: pthread_key_create failed\n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ENDObjClassInit(wti)