summaryrefslogtreecommitdiffstats
path: root/runtime/wti.h
blob: 71eed5679471a0ce338335bcdeeac6366985c822 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/* Definition of the worker thread instance (wti) class.
 *
 * Copyright 2008-2017 Adiscon GmbH.
 *
 * This file is part of the rsyslog runtime library.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *       -or-
 *       see COPYING.ASL20 in the source distribution
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef WTI_H_INCLUDED
#define WTI_H_INCLUDED

#include <pthread.h>
#include <stdlib.h>
#include "wtp.h"
#include "obj.h"
#include "batch.h"
#include "action.h"


#define ACT_STATE_RDY  0	/* action ready, waiting for new transaction */
#define ACT_STATE_ITX  1	/* transaction active, waiting for new data or commit */
/* 2 currently not being used */
#define ACT_STATE_RTRY 3	/* failure occurred, trying to restablish ready state */
#define ACT_STATE_SUSP 4	/* suspended due to failure (return fail until timeout expired) */
#define ACT_STATE_DATAFAIL 5	/* suspended due to failure in data, which means the message in
				   questions needs to be dropped as it will always fail. The
				   action must still do a "normal" retry in order to bring
				   it back to regular state. */
/* note: 3 bit bit field --> highest value is 7! */

typedef struct actWrkrInfo {
	action_t *pAction;
	void *actWrkrData;
	uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an
				   immediate failure following */
	int	iNbrResRtry;	/* number of retries since last suspend */
	sbool	bHadAutoCommit;	/* did an auto-commit happen during doAction()? */
	struct {
		unsigned actState : 3;
	} flags;
	union {
		struct {
			actWrkrIParams_t *iparams;/* dynamically sized array for transactional outputs */
			int currIParam;
			int maxIParams;	/* current max */
		} tx;
		struct {
			actWrkrIParams_t actParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
		} nontx;
	} p; /* short name for "parameters" */
} actWrkrInfo_t;

/* the worker thread instance class */
struct wti_s {
	BEGINobjInstance;
	pthread_t thrdID; 	/* thread ID */
	int bIsRunning;	/* is this thread currently running? (must be int for atomic op!) */
	sbool bAlwaysRunning;	/* should this thread always run? */
	int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
	wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
	batch_t batch; /* pointer to an object array meaningful for current user
			  pointer (e.g. queue pUsr data elemt) */
	uchar *pszDbgHdr;	/* header string for debug messages */
	actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions
				      (sized for max nbr of actions in config!) */
	pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */
	DEF_ATOMIC_HELPER_MUT(mutIsRunning)
	struct {
		uint8_t	script_errno; /* errno-type interface for RainerScript functions */
		uint8_t bPrevWasSuspended;
		uint8_t bDoAutoCommit; /* do a commit after each message
		                        * this is usually set for batches with 0 element, but may
					* also be added as a user-selectable option (not implemented yet)
					*/
	} execState;	/* state for the execution engine */
};


/* prototypes */
rsRetVal wtiConstruct(wti_t **ppThis);
rsRetVal wtiConstructFinalize(wti_t * const pThis);
rsRetVal wtiDestruct(wti_t **ppThis);
rsRetVal wtiWorker(wti_t * const pThis);
rsRetVal wtiSetDbgHdr(wti_t * const pThis, uchar *pszMsg, size_t lenMsg);
uchar * ATTR_NONNULL() wtiGetDbgHdr(const wti_t *const pThis);
rsRetVal wtiCancelThrd(wti_t * const pThis, const uchar *const cancelobj);
void ATTR_NONNULL() wtiJoinThrd(wti_t *const pThis);
rsRetVal wtiSetAlwaysRunning(wti_t * const pThis);
rsRetVal wtiSetState(wti_t * const pThis, int bNew);
rsRetVal wtiWakeupThrd(wti_t * const pThis);
int wtiGetState(wti_t * const pThis);
wti_t *wtiGetDummy(void);
int ATTR_NONNULL() wtiWaitNonEmpty(wti_t *const pThis, const struct timespec timeout);
PROTOTYPEObjClassInit(wti);
PROTOTYPEObjClassExit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);

#define getActionStateByNbr(pWti, iActNbr) ((uint8_t) ((pWti)->actWrkrInfo[(iActNbr)].flags.actState))
#define getActionState(pWti, pAction) (((uint8_t) (pWti)->actWrkrInfo[(pAction)->iActionNbr].flags.actState))
#define setActionState(pWti, pAction, newState) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].flags.actState = \
(newState))
#define getActionResumeInRow(pWti, pAction) (((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow))
#define setActionResumeInRow(pWti, pAction, val) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow = (val))
#define incActionResumeInRow(pWti, pAction) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow++)
#define getActionNbrResRtry(pWti, pAction) (((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry))
#define setActionNbrResRtry(pWti, pAction, val) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry = (val))
#define incActionNbrResRtry(pWti, pAction) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry++)
#define wtiInitIParam(piparams) (memset((piparams), 0, sizeof(actWrkrIParams_t)))

#define wtiGetScriptErrno(pWti) ((pWti)->execState.script_errno)
#define wtiSetScriptErrno(pWti, newval) (pWti)->execState.script_errno = (newval)

static inline uint8_t ATTR_UNUSED ATTR_NONNULL(1)
wtiGetPrevWasSuspended(const wti_t * const pWti)
{
	assert(pWti != NULL);
	return pWti->execState.bPrevWasSuspended;
}

static inline void __attribute__((unused))
wtiResetExecState(wti_t * const pWti, batch_t * const pBatch)
{
	wtiSetScriptErrno(pWti, 0);
	pWti->execState.bPrevWasSuspended = 0;
	pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1);
}


rsRetVal wtiNewIParam(wti_t *const pWti, action_t *const pAction, actWrkrIParams_t **piparams);
#endif /* #ifndef WTI_H_INCLUDED */