summaryrefslogtreecommitdiffstats
path: root/runtime/stream.h
blob: 38db1aba0cc79af2f2485433e76f6b8605d77ac2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
/* Definition of serial stream class (strm).
 *
 * A serial stream provides serial data access. In theory, serial streams
 * can be implemented via a number of methods (e.g. files or in-memory
 * streams). In practice, there currently only exist the file type (aka
 * "driver").
 *
 * In practice, many stream features are bound to files. I have not yet made
 * any serious effort, except for the naming of this class, to try to make
 * the interfaces very generic. However, I assume that we could work much
 * like in the strm class, where some properties are simply ignored when
 * the wrong strm mode is selected (which would translate here to the wrong
 * stream mode).
 *
 * Most importantly, this class provides generic input and output functions
 * which can directly be used to work with the strms and file output. It
 * provides such useful things like a circular file buffer and, hopefully
 * at a later stage, a lazy writer. The object is also seriazable and thus
 * can easily be persistet. The bottom line is that it makes much sense to
 * use this class whereever possible as its features may grow in the future.
 *
 * An important note on writing gzip format via zlib (kept anonymous
 * by request):
 *
 * --------------------------------------------------------------------------
 * We'd like to make sure the output file is in full gzip format
 * (compatible with gzip -d/zcat etc).  There is a flag in how the output
 * is initialized within zlib to properly add the gzip wrappers to the
 * output.  (gzip is effectively a small metadata wrapper around raw
 * zstream output.)
 *
 * I had written an old bit of code to do this - the documentation on
 * deflatInit2() was pretty tricky to nail down on this specific feature:
 *
 * int deflateInit2 (z_streamp strm, int level, int method, int windowBits,
 * int memLevel, int strategy);
 *
 * I believe "31" would be the value for the "windowBits" field that you'd
 * want to try:
 *
 * deflateInit2(zstrmptr, 6, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
 * --------------------------------------------------------------------------
 *
 * Copyright 2008-2018 Rainer Gerhards and Adiscon GmbH.
 *
 * This file is part of the rsyslog runtime library.
 *
 * The rsyslog runtime library is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * The rsyslog runtime library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with the rsyslog runtime library.  If not, see <http://www.gnu.org/licenses/>.
 *
 * A copy of the GPL can be found in the file "COPYING" in this distribution.
 * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
 */

#ifndef STREAM_H_INCLUDED
#define STREAM_H_INCLUDED

typedef struct strm_s strm_t; /* forward reference because of zlib... */

#include <regex.h> // TODO: fix via own module
#include <pthread.h>
#include <stdint.h>
#include <time.h>
#include "obj-types.h"
#include "glbl.h"
#include "stream.h"
#include "zlibw.h"
#include "cryprov.h"

/* stream types */
typedef enum {
	STREAMTYPE_FILE_SINGLE = 0,	/**< read a single file */
	STREAMTYPE_FILE_CIRCULAR = 1,	/**< circular files */
	STREAMTYPE_FILE_MONITOR = 2,	/**< monitor a (third-party) file */
	STREAMTYPE_NAMED_PIPE = 3	/**< file is a named pipe (so far, tested for output only) */
} strmType_t;

typedef enum {				/* when extending, do NOT change existing modes! */
	STREAMMMODE_INVALID = 0,
	STREAMMODE_READ = 1,
	STREAMMODE_WRITE = 2,
	STREAMMODE_WRITE_TRUNC = 3,
	STREAMMODE_WRITE_APPEND = 4
} strmMode_t;
typedef enum {
		STRM_COMPRESS_ZIP = 0,
		STRM_COMPRESS_ZSTD = 1
	} strm_compressionDriver_t;

#define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */
/* The strm_t data structure */
struct strm_s {
	BEGINobjInstance;	/* Data to implement generic object - MUST be the first data element! */
	strmType_t sType;
	/* descriptive properties */
	unsigned int iCurrFNum;/* current file number (NOT descriptor, but the number in the file name!) */
	uchar *pszFName; /* prefix for generated filenames */
	int lenFName;
	strmMode_t tOperationsMode;
	mode_t tOpenMode;
	int64 iMaxFileSize;/* maximum size a file may grow to */
	unsigned int iMaxFiles;	/* maximum number of files if a circular mode is in use */
	int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */
	sbool bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
	int64 iCurrOffs;/* current offset */
	int64 *pUsrWCntr; /* NULL or a user-provided counter that receives the nbr of bytes written since
	the last CntrSet() */
	sbool bPrevWasNL; /* used for readLine() when reading multi-line messages */
	/* dynamic properties, valid only during file open, not to be persistet */
	sbool bDisabled; /* should file no longer be written to? (currently set only if omfile file size limit fails) */
	sbool bSync;	/* sync this file after every write? */
	sbool bReopenOnTruncate;
	int rotationCheck; /* rotation check mode */
	size_t sIOBufSize;/* size of IO buffer */
	uchar *pszDir; /* Directory */
	int lenDir;
	int fd;		/* the file descriptor, -1 if closed */
	int fdDir;	/* the directory's descriptor, in case bSync is requested (-1 if closed) */
	int readTimeout;/* 0: do not timeout */
	time_t lastRead;/* for timeout processing */
	ino_t inode;	/* current inode for files being monitored (undefined else) */
	uchar *pszCurrFName; /* name of current file (if open) */
	uchar *pIOBuf;	/* the iobuffer currently in use to gather data */
	char *pIOBuf_truncation; /* iobuffer used during trucation detection block re-reads */
	size_t iBufPtrMax;	/* current max Ptr in Buffer (if partial read!) */
	size_t iBufPtr;	/* pointer into current buffer */
	int iUngetC;	/* char set via UngetChar() call or -1 if none set */
	sbool bInRecord;	/* if 1, indicates that we are currently writing a not-yet complete record */
	int iZipLevel;	/* zip level (0..9). If 0, zip is completely disabled */
	Bytef *pZipBuf;
	/* support for async flush procesing */
	sbool bAsyncWrite;	/* do asynchronous writes (always if a flush interval is given) */
	sbool bStopWriter;	/* shall writer thread terminate? */
	sbool bDoTimedWait;	/* instruct writer thread to do a times wait to support flush timeouts */
	sbool bzInitDone;	/* did we do an init of zstrm already? */
	sbool bFlushNow;	/* shall we flush with the next async write? */
	sbool bVeryReliableZip; /* shall we write interim headers to create a very reliable ZIP file? */
	int iFlushInterval; /* flush in which interval - 0, no flushing */
	pthread_mutex_t mut;/* mutex for flush in async mode */
	pthread_cond_t notFull;
	pthread_cond_t notEmpty;
	pthread_cond_t isEmpty;
	unsigned short iEnq;	/* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
	unsigned short iDeq;	/* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
	cryprov_if_t *cryprov;  /* ptr to crypto provider; NULL = do not encrypt */
	void	*cryprovData;	/* opaque data ptr for provider use */
	void 	*cryprovFileData;/* opaque data ptr for file instance */
	short iCnt;	/* current nbr of elements in buffer */
	z_stream zstrm;	/* zip stream to use */
	struct {
		uchar *pBuf;
		size_t lenBuf;
	} asyncBuf[STREAM_ASYNC_NUMBUFS];
	struct {
		int num_wrkrs;	/* nbr of worker threads */
		void *cctx;
	} zstd;		/* supporting per-instance data if zstd is used */
	pthread_t writerThreadID;
	/* support for omfile size-limiting commands, special counters, NOT persisted! */
	off_t	iSizeLimit;	/* file size limit, 0 = no limit */
	uchar	*pszSizeLimitCmd;	/* command to carry out when size limit is reached */
	sbool	bIsTTY;		/* is this a tty file? */
	cstr_t *prevLineSegment; /* for ReadLine, previous, unprocessed part of file */
	cstr_t *prevMsgSegment; /* for ReadMultiLine, previous, yet unprocessed part of msg */
	int64 strtOffs;		/* start offset in file for current line/msg */
	int fileNotFoundError;	/* boolean; if set, report file not found errors, else silently ignore */
	int noRepeatedErrorOutput; /* if a file is missing the Error is only given once */
	int ignoringMsg;
	strm_compressionDriver_t compressionDriver;
};


/* interfaces */
BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
	rsRetVal (*Construct)(strm_t **ppThis);
	rsRetVal (*ConstructFinalize)(strm_t *pThis);
	rsRetVal (*Destruct)(strm_t **ppThis);
	rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName);
	rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC);
	rsRetVal (*UnreadChar)(strm_t *pThis, uchar c);
	rsRetVal (*SeekCurrOffs)(strm_t *pThis);
	rsRetVal (*Write)(strm_t *const pThis, const uchar *const pBuf, size_t lenBuf);
	rsRetVal (*WriteChar)(strm_t *pThis, uchar c);
	rsRetVal (*WriteLong)(strm_t *pThis, long i);
	rsRetVal (*SetFileNotFoundError)(strm_t *pThis, int pFileNotFoundError);
	rsRetVal (*SetFName)(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
	rsRetVal (*SetDir)(strm_t *pThis, uchar *pszDir, size_t iLenDir);
	rsRetVal (*Flush)(strm_t *pThis);
	rsRetVal (*RecordBegin)(strm_t *pThis);
	rsRetVal (*RecordEnd)(strm_t *pThis);
	rsRetVal (*Serialize)(strm_t *pThis, strm_t *pStrm);
	rsRetVal (*GetCurrOffset)(strm_t *pThis, int64 *pOffs);
	rsRetVal (*SetWCntr)(strm_t *pThis, number_t *pWCnt);
	rsRetVal (*Dup)(strm_t *pThis, strm_t **ppNew);
	rsRetVal (*SetCompressionWorkers)(strm_t *const pThis, int num_wrkrs);
	INTERFACEpropSetMeth(strm, bDeleteOnClose, int);
	INTERFACEpropSetMeth(strm, iMaxFileSize, int64);
	INTERFACEpropSetMeth(strm, iMaxFiles, int);
	INTERFACEpropSetMeth(strm, iFileNumDigits, int);
	INTERFACEpropSetMeth(strm, tOperationsMode, int);
	INTERFACEpropSetMeth(strm, tOpenMode, mode_t);
	INTERFACEpropSetMeth(strm, compressionDriver, strm_compressionDriver_t);
	INTERFACEpropSetMeth(strm, sType, strmType_t);
	INTERFACEpropSetMeth(strm, iZipLevel, int);
	INTERFACEpropSetMeth(strm, bSync, int);
	INTERFACEpropSetMeth(strm, bReopenOnTruncate, int);
	INTERFACEpropSetMeth(strm, sIOBufSize, size_t);
	INTERFACEpropSetMeth(strm, iSizeLimit, off_t);
	INTERFACEpropSetMeth(strm, iFlushInterval, int);
	INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*);
	/* v6 added */
	rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF, const uchar *,
		uint32_t trimLineOverBytes, int64 *const strtOffs);
	/* v7 added  2012-09-14 */
	INTERFACEpropSetMeth(strm, bVeryReliableZip, int);
	/* v8 added  2013-03-21 */
	rsRetVal (*CheckFileChange)(strm_t *pThis);
	/* v9 added  2013-04-04 */
	INTERFACEpropSetMeth(strm, cryprov, cryprov_if_t*);
	INTERFACEpropSetMeth(strm, cryprovData, void*);
ENDinterface(strm)
#define strmCURR_IF_VERSION 14 /* increment whenever you change the interface structure! */
/* V10, 2013-09-10: added new parameter bEscapeLF, changed mode to uint8_t (rgerhards) */
/* V11, 2015-12-03: added new parameter bReopenOnTruncate */
/* V12, 2015-12-11: added new parameter trimLineOverBytes, changed mode to uint32_t */
/* V13, 2017-09-06: added new parameter strtoffs to ReadLine() */
/* V14, 2019-11-13: added new parameter bEscapeLFString (rgerhards) */

#define strmGetCurrFileNum(pStrm) ((pStrm)->iCurrFNum)

/* prototypes */
PROTOTYPEObjClassInit(strm);
rsRetVal strmMultiFileSeek(strm_t *pThis, unsigned int fileNum, off64_t offs, off64_t *bytesDel);
rsRetVal ATTR_NONNULL(1,2) strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *start_preg,
	regex_t *end_preg, const sbool bEscapeLF, const uchar *const escapeLFString,
	const sbool discardTruncatedMsg, const sbool msgDiscardingError, int64 *const strtOffs);
int strmReadMultiLine_isTimedOut(const strm_t *const __restrict__ pThis);
void strmDebugOutBuf(const strm_t *const pThis);
void strmSetReadTimeout(strm_t *const __restrict__ pThis, const int val);
const uchar * ATTR_NONNULL() strmGetPrevLineSegment(strm_t *const pThis);
const uchar * ATTR_NONNULL() strmGetPrevMsgSegment(strm_t *const pThis);
int ATTR_NONNULL() strmGetPrevWasNL(const strm_t *const pThis);

#endif /* #ifndef STREAM_H_INCLUDED */