summaryrefslogtreecommitdiffstats
path: root/runtime/wti.h
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.h')
-rw-r--r--runtime/wti.h144
1 files changed, 144 insertions, 0 deletions
diff --git a/runtime/wti.h b/runtime/wti.h
new file mode 100644
index 0000000..71eed56
--- /dev/null
+++ b/runtime/wti.h
@@ -0,0 +1,144 @@
+/* Definition of the worker thread instance (wti) class.
+ *
+ * Copyright 2008-2017 Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef WTI_H_INCLUDED
+#define WTI_H_INCLUDED
+
+#include <pthread.h>
+#include <stdlib.h>
+#include "wtp.h"
+#include "obj.h"
+#include "batch.h"
+#include "action.h"
+
+
+#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */
+#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */
+/* 2 currently not being used */
+#define ACT_STATE_RTRY 3 /* failure occurred, trying to restablish ready state */
+#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */
+#define ACT_STATE_DATAFAIL 5 /* suspended due to failure in data, which means the message in
+ questions needs to be dropped as it will always fail. The
+ action must still do a "normal" retry in order to bring
+ it back to regular state. */
+/* note: 3 bit bit field --> highest value is 7! */
+
+typedef struct actWrkrInfo {
+ action_t *pAction;
+ void *actWrkrData;
+ uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an
+ immediate failure following */
+ int iNbrResRtry; /* number of retries since last suspend */
+ sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */
+ struct {
+ unsigned actState : 3;
+ } flags;
+ union {
+ struct {
+ actWrkrIParams_t *iparams;/* dynamically sized array for transactional outputs */
+ int currIParam;
+ int maxIParams; /* current max */
+ } tx;
+ struct {
+ actWrkrIParams_t actParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+ } nontx;
+ } p; /* short name for "parameters" */
+} actWrkrInfo_t;
+
+/* the worker thread instance class */
+struct wti_s {
+ BEGINobjInstance;
+ pthread_t thrdID; /* thread ID */
+ int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */
+ sbool bAlwaysRunning; /* should this thread always run? */
+ int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
+ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
+ batch_t batch; /* pointer to an object array meaningful for current user
+ pointer (e.g. queue pUsr data elemt) */
+ uchar *pszDbgHdr; /* header string for debug messages */
+ actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions
+ (sized for max nbr of actions in config!) */
+ pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */
+ DEF_ATOMIC_HELPER_MUT(mutIsRunning)
+ struct {
+ uint8_t script_errno; /* errno-type interface for RainerScript functions */
+ uint8_t bPrevWasSuspended;
+ uint8_t bDoAutoCommit; /* do a commit after each message
+ * this is usually set for batches with 0 element, but may
+ * also be added as a user-selectable option (not implemented yet)
+ */
+ } execState; /* state for the execution engine */
+};
+
+
+/* prototypes */
+rsRetVal wtiConstruct(wti_t **ppThis);
+rsRetVal wtiConstructFinalize(wti_t * const pThis);
+rsRetVal wtiDestruct(wti_t **ppThis);
+rsRetVal wtiWorker(wti_t * const pThis);
+rsRetVal wtiSetDbgHdr(wti_t * const pThis, uchar *pszMsg, size_t lenMsg);
+uchar * ATTR_NONNULL() wtiGetDbgHdr(const wti_t *const pThis);
+rsRetVal wtiCancelThrd(wti_t * const pThis, const uchar *const cancelobj);
+void ATTR_NONNULL() wtiJoinThrd(wti_t *const pThis);
+rsRetVal wtiSetAlwaysRunning(wti_t * const pThis);
+rsRetVal wtiSetState(wti_t * const pThis, int bNew);
+rsRetVal wtiWakeupThrd(wti_t * const pThis);
+int wtiGetState(wti_t * const pThis);
+wti_t *wtiGetDummy(void);
+int ATTR_NONNULL() wtiWaitNonEmpty(wti_t *const pThis, const struct timespec timeout);
+PROTOTYPEObjClassInit(wti);
+PROTOTYPEObjClassExit(wti);
+PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
+PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
+
+#define getActionStateByNbr(pWti, iActNbr) ((uint8_t) ((pWti)->actWrkrInfo[(iActNbr)].flags.actState))
+#define getActionState(pWti, pAction) (((uint8_t) (pWti)->actWrkrInfo[(pAction)->iActionNbr].flags.actState))
+#define setActionState(pWti, pAction, newState) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].flags.actState = \
+(newState))
+#define getActionResumeInRow(pWti, pAction) (((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow))
+#define setActionResumeInRow(pWti, pAction, val) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow = (val))
+#define incActionResumeInRow(pWti, pAction) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow++)
+#define getActionNbrResRtry(pWti, pAction) (((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry))
+#define setActionNbrResRtry(pWti, pAction, val) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry = (val))
+#define incActionNbrResRtry(pWti, pAction) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry++)
+#define wtiInitIParam(piparams) (memset((piparams), 0, sizeof(actWrkrIParams_t)))
+
+#define wtiGetScriptErrno(pWti) ((pWti)->execState.script_errno)
+#define wtiSetScriptErrno(pWti, newval) (pWti)->execState.script_errno = (newval)
+
+static inline uint8_t ATTR_UNUSED ATTR_NONNULL(1)
+wtiGetPrevWasSuspended(const wti_t * const pWti)
+{
+ assert(pWti != NULL);
+ return pWti->execState.bPrevWasSuspended;
+}
+
+static inline void __attribute__((unused))
+wtiResetExecState(wti_t * const pWti, batch_t * const pBatch)
+{
+ wtiSetScriptErrno(pWti, 0);
+ pWti->execState.bPrevWasSuspended = 0;
+ pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1);
+}
+
+
+rsRetVal wtiNewIParam(wti_t *const pWti, action_t *const pAction, actWrkrIParams_t **piparams);
+#endif /* #ifndef WTI_H_INCLUDED */