From dcc721a95bef6f0d8e6d8775b8efe33e5aecd562 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 15 Apr 2024 18:28:20 +0200 Subject: Adding upstream version 8.2402.0. Signed-off-by: Daniel Baumann --- runtime/stream.c | 2533 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 2533 insertions(+) create mode 100644 runtime/stream.c (limited to 'runtime/stream.c') diff --git a/runtime/stream.c b/runtime/stream.c new file mode 100644 index 0000000..73d63e4 --- /dev/null +++ b/runtime/stream.c @@ -0,0 +1,2533 @@ +/* The serial stream class. + * + * A serial stream provides serial data access. In theory, serial streams + * can be implemented via a number of methods (e.g. files or in-memory + * streams). In practice, there currently only exist the file type (aka + * "driver"). + * + * File begun on 2008-01-09 by RGerhards + * Large modifications in 2009-06 to support using it with omfile, including zip writer. + * Note that this file obtains the zlib wrapper object is needed, but it never frees it + * again. While this sounds like a leak (and one may argue it actually is), there is no + * harm associated with that. The reason is that strm is a core object, so it is terminated + * only when rsyslogd exists. As we could only release on termination (or else bear more + * overhead for keeping track of how many users we have), not releasing zlibw is OK, because + * it will be released when rsyslogd terminates. We may want to revisit this decision if + * it turns out to be problematic. Then, we need to quasi-refcount the number of accesses + * to the object. + * + * Copyright 2008-2022 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include /* required for HP UX */ +#include +#include +#include +#ifdef HAVE_SYS_PRCTL_H +# include +#endif + +#include "rsyslog.h" +#include "stringbuf.h" +#include "srUtils.h" +#include "obj.h" +#include "stream.h" +#include "unicode-helper.h" +#include "module-template.h" +#include "errmsg.h" +#include "zstdw.h" +#include "cryprov.h" +#include "datetime.h" +#include "rsconf.h" + +/* some platforms do not have large file support :( */ +#ifndef O_LARGEFILE +# define O_LARGEFILE 0 +#endif +#ifndef HAVE_LSEEK64 +# define lseek64(fd, offset, whence) lseek(fd, offset, whence) +#endif + +/* static data */ +DEFobjStaticHelpers +DEFobjCurrIf(zlibw) +DEFobjCurrIf(zstdw) + +/* forward definitions */ +static rsRetVal strmFlushInternal(strm_t *pThis, int bFlushZip); +static rsRetVal strmWrite(strm_t *__restrict__ const pThis, const uchar *__restrict__ const pBuf, + const size_t lenBuf); +static rsRetVal strmOpenFile(strm_t *pThis); +static rsRetVal strmCloseFile(strm_t *pThis); +static void *asyncWriterThread(void *pPtr); +static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush); +static rsRetVal doZipFinish(strm_t *pThis); +static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal strmSeekCurrOffs(strm_t *pThis); + + +/* methods */ + + +/* note: this may return NULL if not line segment is currently set */ +// TODO: due to the cstrFinalize() this is not totally clean, albeit for our +// current use case it does not hurt -- refactor! rgerhards, 2018-03-27 +const uchar * ATTR_NONNULL() +strmGetPrevLineSegment(strm_t *const pThis) +{ + const uchar *ret = NULL; + if(pThis->prevLineSegment != NULL) { + cstrFinalize(pThis->prevLineSegment); + ret = rsCStrGetSzStrNoNULL(pThis->prevLineSegment); + } + return ret; +} +/* note: this may return NULL if not line segment is currently set */ +// TODO: due to the cstrFinalize() this is not totally clean, albeit for our +// current use case it does not hurt -- refactor! rgerhards, 2018-03-27 +const uchar * ATTR_NONNULL() +strmGetPrevMsgSegment(strm_t *const pThis) +{ + const uchar *ret = NULL; + if(pThis->prevMsgSegment != NULL) { + cstrFinalize(pThis->prevMsgSegment); + ret = rsCStrGetSzStrNoNULL(pThis->prevMsgSegment); + } + return ret; +} + + +int ATTR_NONNULL() +strmGetPrevWasNL(const strm_t *const pThis) +{ + return pThis->bPrevWasNL; +} + + +/* output (current) file name for debug log purposes. Falls back to various + * levels of impreciseness if more precise name is not known. + */ +static const char * +getFileDebugName(const strm_t *const pThis) +{ + return (pThis->pszCurrFName == NULL) ? + ((pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName) + : (const char*) pThis->pszCurrFName; +} + +/* Try to resolve a size limit situation. This is used to support custom-file size handlers + * for omfile. It first runs the command, and then checks if we are still above the size + * treshold. Note that this works only with single file names, NOT with circular names. + * Note that pszCurrFName can NOT be taken from pThis, because the stream is closed when + * we are called (and that destroys pszCurrFName, as there is NO CURRENT file name!). So + * we need to receive the name as a parameter. + * initially wirtten 2005-06-21, moved to this class & updates 2009-06-01, both rgerhards + */ +static rsRetVal +resolveFileSizeLimit(strm_t *pThis, uchar *pszCurrFName) +{ + uchar *pParams; + uchar *pCmd; + uchar *p; + off_t actualFileSize; + rsRetVal localRet; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + assert(pszCurrFName != NULL); + + if(pThis->pszSizeLimitCmd == NULL) { + ABORT_FINALIZE(RS_RET_NON_SIZELIMITCMD); /* nothing we can do in this case... */ + } + + /* we first check if we have command line parameters. We assume this, + * when we have a space in the program name. If we find it, everything after + * the space is treated as a single argument. + */ + CHKmalloc(pCmd = ustrdup(pThis->pszSizeLimitCmd)); + + for(p = pCmd ; *p && *p != ' ' ; ++p) { + /* JUST SKIP */ + } + + if(*p == ' ') { + *p = '\0'; /* pretend string-end */ + pParams = p+1; + } else + pParams = NULL; + + /* the execProg() below is probably not great, but at least is is + * fairly secure now. Once we change the way file size limits are + * handled, we should also revisit how this command is run (and + * with which parameters). rgerhards, 2007-07-20 + */ + execProg(pCmd, 1, pParams); + + free(pCmd); + + localRet = getFileSize(pszCurrFName, &actualFileSize); + + if(localRet == RS_RET_OK && actualFileSize >= pThis->iSizeLimit) { + ABORT_FINALIZE(RS_RET_SIZELIMITCMD_DIDNT_RESOLVE); /* OK, it didn't work out... */ + } else if(localRet != RS_RET_FILE_NOT_FOUND) { + /* file not found is OK, the command may have moved away the file */ + ABORT_FINALIZE(localRet); + } + +finalize_it: + if(iRet != RS_RET_OK) { + if(iRet == RS_RET_SIZELIMITCMD_DIDNT_RESOLVE) { + LogError(0, RS_RET_ERR, "file size limit cmd for file '%s' " + "did no resolve situation\n", pszCurrFName); + } else { + LogError(0, RS_RET_ERR, "file size limit cmd for file '%s' " + "failed with code %d.\n", pszCurrFName, iRet); + } + pThis->bDisabled = 1; + } + + RETiRet; +} + + +/* Check if the file has grown beyond the configured omfile iSizeLimit + * and, if so, initiate processing. + */ +static rsRetVal +doSizeLimitProcessing(strm_t *pThis) +{ + uchar *pszCurrFName = NULL; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + assert(pThis->iSizeLimit != 0); + assert(pThis->fd != -1); + + if(pThis->iCurrOffs >= pThis->iSizeLimit) { + /* strmCloseFile() destroys the current file name, so we + * need to preserve it. + */ + CHKmalloc(pszCurrFName = ustrdup(pThis->pszCurrFName)); + CHKiRet(strmCloseFile(pThis)); + CHKiRet(resolveFileSizeLimit(pThis, pszCurrFName)); + } + +finalize_it: + free(pszCurrFName); + RETiRet; +} + + +/* now, we define type-specific handlers. The provide a generic functionality, + * but for this specific type of strm. The mapping to these handlers happens during + * strm construction. Later on, handlers are called by pointers present in the + * strm instance object. + */ + +/* do the physical open() call on a file. + */ +static rsRetVal +doPhysOpen(strm_t *pThis) +{ + int iFlags = 0; + struct stat statOpen; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + /* compute which flags we need to provide to open */ + switch(pThis->tOperationsMode) { + case STREAMMODE_READ: + iFlags = O_CLOEXEC | O_NOCTTY | O_RDONLY; + break; + case STREAMMODE_WRITE: /* legacy mode used inside queue engine */ + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT; + break; + case STREAMMODE_WRITE_TRUNC: + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_TRUNC; + break; + case STREAMMODE_WRITE_APPEND: + iFlags = O_CLOEXEC | O_NOCTTY | O_WRONLY | O_CREAT | O_APPEND; + break; + case STREAMMMODE_INVALID: + default:assert(0); + break; + } + if(pThis->sType == STREAMTYPE_NAMED_PIPE) { + DBGPRINTF("Note: stream '%s' is a named pipe, open with O_NONBLOCK\n", pThis->pszCurrFName); + iFlags |= O_NONBLOCK; + } + + if(pThis->bAsyncWrite)d_pthread_mutex_lock(&pThis->mut); + pThis->fd = open((char*)pThis->pszCurrFName, iFlags | O_LARGEFILE, pThis->tOpenMode); + if(pThis->bAsyncWrite) d_pthread_mutex_unlock(&pThis->mut); + + const int errno_save = errno; /* dbgprintf can mangle it! */ + DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName, + pThis->fd, (int) pThis->tOpenMode); + if(pThis->fd == -1) { + const rsRetVal errcode = (errno_save == ENOENT) ? RS_RET_FILE_NOT_FOUND + : RS_RET_FILE_OPEN_ERROR; + if(pThis->fileNotFoundError) { + if(pThis->noRepeatedErrorOutput == 0) { + LogError(errno_save, errcode, "file '%s': open error", pThis->pszCurrFName); + pThis->noRepeatedErrorOutput = 1; + } + } else { + DBGPRINTF("file '%s': open error", pThis->pszCurrFName); + } + ABORT_FINALIZE(errcode); + } else { + pThis->noRepeatedErrorOutput = 0; + } + + if(pThis->tOperationsMode == STREAMMODE_READ) { + if(fstat(pThis->fd, &statOpen) == -1) { + DBGPRINTF("Error: cannot obtain inode# for file %s\n", pThis->pszCurrFName); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + pThis->inode = statOpen.st_ino; + } + + if(!ustrcmp(pThis->pszCurrFName, UCHAR_CONSTANT(_PATH_CONSOLE)) || isatty(pThis->fd)) { + DBGPRINTF("file %d is a tty-type file\n", pThis->fd); + pThis->bIsTTY = 1; + } else { + pThis->bIsTTY = 0; + } + + if(pThis->cryprov != NULL) { + CHKiRet(pThis->cryprov->OnFileOpen(pThis->cryprovData, + pThis->pszCurrFName, &pThis->cryprovFileData, + (pThis->tOperationsMode == STREAMMODE_READ) ? 'r' : 'w')); + pThis->cryprov->SetDeleteOnClose(pThis->cryprovFileData, pThis->bDeleteOnClose); + } + +finalize_it: + RETiRet; +} + + +static rsRetVal +strmSetCurrFName(strm_t *pThis) +{ + DEFiRet; + + if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) { + CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, + pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, pThis->iFileNumDigits)); + } else { + if(pThis->pszDir == NULL) { + if((pThis->pszCurrFName = ustrdup(pThis->pszFName)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } else { + CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, + pThis->pszFName, pThis->lenFName, -1, 0)); + } + } +finalize_it: + RETiRet; +} + +/* This function checks if the actual file has changed and, if so, resets the + * offset. This is support for monitoring files. It should be called after + * deserializing the strm object and before doing any other operation on it + * (most importantly not an open or seek!). + */ +static rsRetVal +CheckFileChange(strm_t *pThis) +{ + struct stat statName; + DEFiRet; + + CHKiRet(strmSetCurrFName(pThis)); + if(stat((char*) pThis->pszCurrFName, &statName) == -1) + ABORT_FINALIZE(RS_RET_IO_ERROR); + DBGPRINTF("CheckFileChange: stream/after deserialize checking for file change " + "on '%s', inode %u/%u, size/currOffs %llu/%llu\n", + pThis->pszCurrFName, (unsigned) pThis->inode, + (unsigned) statName.st_ino, + (long long unsigned) statName.st_size, + (long long unsigned) pThis->iCurrOffs); + if(pThis->inode != statName.st_ino || statName.st_size < pThis->iCurrOffs) { + DBGPRINTF("stream: file %s has changed\n", pThis->pszCurrFName); + pThis->iCurrOffs = 0; + } +finalize_it: + RETiRet; +} + + +/* open a strm file + * It is OK to call this function when the stream is already open. In that + * case, it returns immediately with RS_RET_OK + */ +static rsRetVal strmOpenFile(strm_t *pThis) +{ + DEFiRet; + off_t offset; + + assert(pThis != NULL); + + if(pThis->fd != -1) + ABORT_FINALIZE(RS_RET_OK); + + free(pThis->pszCurrFName); + pThis->pszCurrFName = NULL; /* used to prevent mem leak in case of error */ + + if(pThis->pszFName == NULL) + ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); + + CHKiRet(strmSetCurrFName(pThis)); + + CHKiRet(doPhysOpen(pThis)); + + pThis->iCurrOffs = 0; + pThis->iBufPtrMax = 0; + CHKiRet(getFileSize(pThis->pszCurrFName, &offset)); + if(pThis->tOperationsMode == STREAMMODE_WRITE_APPEND) { + pThis->iCurrOffs = offset; + } else if(pThis->tOperationsMode == STREAMMODE_WRITE_TRUNC) { + if(offset != 0) { + LogError(0, 0, "file '%s' opened for truncate write, but " + "already contains %zd bytes\n", + pThis->pszCurrFName, (ssize_t) offset); + } + } + + DBGOPRINT((obj_t*) pThis, "opened file '%s' for %s as %d\n", pThis->pszCurrFName, + (pThis->tOperationsMode == STREAMMODE_READ) ? "READ" : "WRITE", pThis->fd); + +finalize_it: + if(iRet == RS_RET_OK) { + assert(pThis->fd != -1); + } else { + if(pThis->pszCurrFName != NULL) { + free(pThis->pszCurrFName); + pThis->pszCurrFName = NULL; /* just to prevent mis-adressing down the road... */ + } + if(pThis->fd != -1) { + close(pThis->fd); + pThis->fd = -1; + } + } + RETiRet; +} + + +/* wait for the output writer thread to be done. This must be called before actions + * that require data to be persisted. May be called in non-async mode and is a null + * operation than. Must be called with the mutex locked. + */ +static void +strmWaitAsyncWriterDone(strm_t *pThis) +{ + if(pThis->bAsyncWrite) { + /* awake writer thread and make it write out everything */ + while(pThis->iCnt > 0) { + pthread_cond_signal(&pThis->notEmpty); + d_pthread_cond_wait(&pThis->isEmpty, &pThis->mut); + } + } +} + +/* stop the writer thread (we MUST be runnnig asynchronously when this method + * is called!). Note that the mutex must be locked! -- rgerhards, 2009-07-06 + */ +static void +stopWriter(strm_t *const pThis) +{ + pThis->bStopWriter = 1; + pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(&pThis->mut); + pthread_join(pThis->writerThreadID, NULL); +} + + + +/* close a strm file + * Note that the bDeleteOnClose flag is honored. If it is set, the file will be + * deleted after close. This is in support for the qRead thread. + * Note: it is valid to call this function when the physical file is closed. If so, + * strmCloseFile() will still check if there is any unwritten data inside buffers + * (this may be the case) and, if so, will open the file, write the data, and then + * close it again (this is done via strmFlushInternal and friends). + */ +static rsRetVal strmCloseFile(strm_t *pThis) +{ + off64_t currOffs; + DEFiRet; + + assert(pThis != NULL); + DBGOPRINT((obj_t*) pThis, "file %d(%s) closing, bDeleteOnClose %d\n", pThis->fd, + getFileDebugName(pThis), pThis->bDeleteOnClose); + + if(pThis->tOperationsMode != STREAMMODE_READ) { + if(pThis->bAsyncWrite) { + strmWaitAsyncWriterDone(pThis); + } + strmFlushInternal(pThis, 0); + if(pThis->iZipLevel) { + doZipFinish(pThis); + } + if(pThis->bAsyncWrite) { + stopWriter(pThis); + } + } + + /* if we have a signature provider, we must make sure that the crypto + * state files are opened and proper close processing happens. */ + if(pThis->cryprov != NULL && pThis->fd == -1) { + const rsRetVal localRet = strmOpenFile(pThis); + if(localRet != RS_RET_OK) { + LogError(0, localRet, "could not open file %s, this " + "may result in problems with encryption - " + "unfortunately, we cannot do anything against " + "this.", pThis->pszCurrFName); + } + } + + /* the file may already be closed (or never have opened), so guard + * against this. -- rgerhards, 2010-03-19 + */ + if(pThis->fd != -1) { + DBGOPRINT((obj_t*) pThis, "file %d(%s) closing\n", + pThis->fd, getFileDebugName(pThis)); + currOffs = lseek64(pThis->fd, 0, SEEK_CUR); + close(pThis->fd); + pThis->fd = -1; + pThis->inode = 0; + if(pThis->cryprov != NULL) { + pThis->cryprov->OnFileClose(pThis->cryprovFileData, currOffs); + pThis->cryprovFileData = NULL; + } + } + + if(pThis->fdDir != -1) { + /* close associated directory handle, if it is open */ + close(pThis->fdDir); + pThis->fdDir = -1; + } + + if(pThis->bDeleteOnClose) { + if(pThis->pszCurrFName == NULL) { + CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, + pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, + pThis->iFileNumDigits)); + } + DBGPRINTF("strmCloseFile: deleting '%s'\n", pThis->pszCurrFName); + if(unlink((char*) pThis->pszCurrFName) == -1) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("error %d unlinking '%s' - ignored: %s\n", + errno, pThis->pszCurrFName, errStr); + } + } + + pThis->iCurrOffs = 0; /* we are back at begin of file */ + +finalize_it: + free(pThis->pszCurrFName); + pThis->pszCurrFName = NULL; + RETiRet; +} + + +/* switch to next strm file + * This method must only be called if we are in a multi-file mode! + */ +static rsRetVal +strmNextFile(strm_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + assert(pThis->sType == STREAMTYPE_FILE_CIRCULAR); + assert(pThis->iMaxFiles != 0); + assert(pThis->fd != -1); + + CHKiRet(strmCloseFile(pThis)); + + /* we do modulo operation to ensure we obey the iMaxFile property. This will always + * result in a file number lower than iMaxFile, so it if wraps, the name is back to + * 0, which results in the first file being overwritten. Not desired for queues, so + * make sure their iMaxFiles is large enough. But it is well-desired for other + * use cases, e.g. a circular output log file. -- rgerhards, 2008-01-10 + */ + pThis->iCurrFNum = (pThis->iCurrFNum + 1) % pThis->iMaxFiles; + +finalize_it: + RETiRet; +} + + +/* handle the EOF case of a stream + * The EOF case is somewhat complicated, as the proper action depends on the + * mode the stream is in. If there are multiple files (circular logs, most + * important use case is queue files!), we need to close the current file and + * try to open the next one. + * rgerhards, 2008-02-13 + */ +static rsRetVal ATTR_NONNULL() +strmHandleEOF(strm_t *const pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + switch(pThis->sType) { + case STREAMTYPE_FILE_SINGLE: + case STREAMTYPE_NAMED_PIPE: + ABORT_FINALIZE(RS_RET_EOF); + break; + case STREAMTYPE_FILE_CIRCULAR: + /* we have multiple files and need to switch to the next one */ + /* TODO: think about emulating EOF in this case (not yet needed) */ + DBGOPRINT((obj_t*) pThis, "file %d EOF\n", pThis->fd); + CHKiRet(strmNextFile(pThis)); + break; + case STREAMTYPE_FILE_MONITOR: + DBGOPRINT((obj_t*) pThis, "file '%s' (%d) EOF, rotationCheck %d\n", + pThis->pszCurrFName, pThis->fd, pThis->rotationCheck); +DBGPRINTF("RGER: EOF!\n"); + ABORT_FINALIZE(RS_RET_EOF); + break; + } + +finalize_it: + RETiRet; +} + + +/* helper to checkTruncation */ +static rsRetVal ATTR_NONNULL() +rereadTruncated(strm_t *const pThis, const int err_no, const char *const reason, const long long data) +{ + DEFiRet; + + LogMsg(err_no, RS_RET_FILE_TRUNCATED, LOG_WARNING, "file '%s': truncation detected, " + "(%s) - re-start reading from beginning (data %lld)", + pThis->pszCurrFName, reason, data); + DBGPRINTF("checkTruncation, file %s last buffer CHANGED\n", pThis->pszCurrFName); + CHKiRet(strmCloseFile(pThis)); + CHKiRet(strmOpenFile(pThis)); + iRet = RS_RET_FILE_TRUNCATED; + +finalize_it: + RETiRet; +} +/* helper to read: + * Check if file has been truncated since last read and, if so, re-set reading + * to begin of file. To detect truncation, we try to re-read the last block. + * If that does not succeed or different data than from the original read is + * returned, truncation is assumed. + * NOTE: this function must be called only if truncation is enabled AND + * when the previous read buffer still is valid (aka "before the next read"). + * It is ok to call with a 0-size buffer, which we than assume as begin of + * reading. In that case, no truncation will be detected. + * rgerhards, 2018-09-20 + */ +static rsRetVal ATTR_NONNULL() +checkTruncation(strm_t *const pThis) +{ + DEFiRet; + off64_t ret; + assert(pThis->bReopenOnTruncate); + assert(pThis->fd != -1); + + DBGPRINTF("checkTruncation, file %s, iBufPtrMax %zd\n", pThis->pszCurrFName, pThis->iBufPtrMax); + if(pThis->iBufPtrMax == 0) { + FINALIZE; + } + + const off64_t backseek = -1 * (off64_t) pThis->iBufPtrMax; + ret = lseek64(pThis->fd, backseek, SEEK_CUR); + if(ret < 0) { + iRet = rereadTruncated(pThis, errno, + "cannot seek backward to begin of last block", backseek); + FINALIZE; + } + + const ssize_t lenRead = read(pThis->fd, pThis->pIOBuf_truncation, pThis->iBufPtrMax); + if(lenRead != (ssize_t) pThis->iBufPtrMax) { + iRet = rereadTruncated(pThis, errno, + "last block could not be re-read", lenRead); + FINALIZE; + } + + if(!memcmp(pThis->pIOBuf_truncation, pThis->pIOBuf, pThis->iBufPtrMax)) { + DBGPRINTF("checkTruncation, file %s last buffer unchanged\n", pThis->pszCurrFName); + } else { + iRet = rereadTruncated(pThis, errno, "last block data different", 0); + } + +finalize_it: + RETiRet; +} + + +/* read the next buffer from disk + * rgerhards, 2008-02-13 + */ +static rsRetVal +strmReadBuf(strm_t *pThis, int *padBytes) +{ + DEFiRet; + int bRun; + long iLenRead; + size_t actualDataLen; + size_t toRead; + ssize_t bytesLeft; + + ISOBJ_TYPE_assert(pThis, strm); + /* We need to try read at least twice because we may run into EOF and need to switch files. */ + bRun = 1; + while(bRun) { + /* first check if we need to (re)open the file. We may have switched to a new one in + * circular mode or it may have been rewritten (rotated) if we monitor a file + * rgerhards, 2008-02-13 + */ + CHKiRet(strmOpenFile(pThis)); + if(pThis->cryprov == NULL) { + toRead = pThis->sIOBufSize; + } else { + CHKiRet(pThis->cryprov->GetBytesLeftInBlock(pThis->cryprovFileData, &bytesLeft)); + if(bytesLeft == -1 || bytesLeft > (ssize_t) pThis->sIOBufSize) { + toRead = pThis->sIOBufSize; + } else { + toRead = (size_t) bytesLeft; + } + } + if(pThis->bReopenOnTruncate) { + rsRetVal localRet = checkTruncation(pThis); + if(localRet == RS_RET_FILE_TRUNCATED) { + continue; + } + CHKiRet(localRet); + } + iLenRead = read(pThis->fd, pThis->pIOBuf, toRead); + DBGOPRINT((obj_t*) pThis, "file %d read %ld bytes\n", pThis->fd, iLenRead); + DBGOPRINT((obj_t*) pThis, "file %d read %*s\n", pThis->fd, (unsigned) iLenRead, (char*) pThis->pIOBuf); + /* end crypto */ + if(iLenRead == 0) { + CHKiRet(strmHandleEOF(pThis)); + } else if(iLenRead < 0) + ABORT_FINALIZE(RS_RET_IO_ERROR); + else { /* good read */ + /* here we place our crypto interface */ + if(pThis->cryprov != NULL) { + actualDataLen = iLenRead; + pThis->cryprov->Decrypt(pThis->cryprovFileData, pThis->pIOBuf, &actualDataLen); + *padBytes = iLenRead - actualDataLen; + iLenRead = actualDataLen; + DBGOPRINT((obj_t*) pThis, "encrypted file %d pad bytes %d, actual " + "data %ld\n", pThis->fd, *padBytes, iLenRead); + } else { + *padBytes = 0; + } + pThis->iBufPtrMax = iLenRead; + bRun = 0; /* exit loop */ + } + } + /* if we reach this point, we had a good read */ + pThis->iBufPtr = 0; + +finalize_it: + RETiRet; +} + + +/* debug output of current buffer */ +void +strmDebugOutBuf(const strm_t *const pThis) +{ + int strtIdx = pThis->iBufPtr - 50; + if(strtIdx < 0) + strtIdx = 0; + DBGOPRINT((obj_t*) pThis, "strmRead ungetc %d, index %zd, max %zd, buf '%.*s', CURR: '%.*s'\n", + pThis->iUngetC, pThis->iBufPtr, pThis->iBufPtrMax, (int) pThis->iBufPtrMax - strtIdx, + pThis->pIOBuf+strtIdx, (int) (pThis->iBufPtrMax - pThis->iBufPtr), pThis->pIOBuf+pThis->iBufPtr); +} + +/* logically "read" a character from a file. What actually happens is that + * data is taken from the buffer. Only if the buffer is full, data is read + * directly from file. In that case, a read is performed blockwise. + * rgerhards, 2008-01-07 + * NOTE: needs to be enhanced to support sticking with a strm entry (if not + * deleted). + */ +static rsRetVal strmReadChar(strm_t *pThis, uchar *pC) +{ + int padBytes = 0; /* in crypto mode, we may have some padding (non-data) bytes */ + DEFiRet; + + assert(pThis != NULL); + assert(pC != NULL); + + /* DEV debug only: DBGOPRINT((obj_t*) pThis, "strmRead index %zd, max %zd\n", pThis->iBufPtr, + pThis->iBufPtrMax); */ + if(pThis->iUngetC != -1) { /* do we have an "unread" char that we need to provide? */ + *pC = pThis->iUngetC; + ++pThis->iCurrOffs; /* one more octet read */ + pThis->iUngetC = -1; + ABORT_FINALIZE(RS_RET_OK); + } + + /* do we need to obtain a new buffer? */ + if(pThis->iBufPtr >= pThis->iBufPtrMax) { + CHKiRet(strmReadBuf(pThis, &padBytes)); + } + pThis->iCurrOffs += padBytes; + + /* if we reach this point, we have data available in the buffer */ + + *pC = pThis->pIOBuf[pThis->iBufPtr++]; + ++pThis->iCurrOffs; /* one more octet read */ + +finalize_it: + RETiRet; +} + + +/* unget a single character just like ungetc(). As with that call, there is only a single + * character buffering capability. + * rgerhards, 2008-01-07 + */ +static rsRetVal strmUnreadChar(strm_t *pThis, uchar c) +{ + assert(pThis != NULL); + assert(pThis->iUngetC == -1); + pThis->iUngetC = c; + --pThis->iCurrOffs; /* one less octet read - NOTE: this can cause problems if we got a file change + and immediately do an unread and the file is on a buffer boundary and the stream is then persisted. + With the queue, this can not happen as an Unread is only done on record begin, which is never split + accross files. For other cases we accept the very remote risk. -- rgerhards, 2008-01-12 */ + + return RS_RET_OK; +} + +/* read a 'paragraph' from a strm file. + * A paragraph may be terminated by a LF, by a LFLF, or by LF depending on the option set. + * The termination LF characters are read, but are + * not returned in the buffer (it is discared). The caller is responsible for + * destruction of the returned CStr object! -- dlang 2010-12-13 + * + * Parameter mode controls legacy multi-line processing: + * mode = 0 single line mode (equivalent to ReadLine) + * mode = 1 LFLF mode (paragraph, blank line between entries) + * mode = 2 LF mode, a log line starts at the beginning of + * a line, but following lines that are indented are part of the same log entry + */ +static rsRetVal ATTR_NONNULL(1, 2) +strmReadLine(strm_t *const pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF, + const uchar *const escapeLFString, uint32_t trimLineOverBytes, int64 *const strtOffs) +{ + uchar c; + uchar finished; + const int escapeLFString_len = (escapeLFString == NULL) ? 4 : strlen((char*) escapeLFString); + DEFiRet; + + assert(pThis != NULL); + assert(ppCStr != NULL); + + CHKiRet(cstrConstruct(ppCStr)); + CHKiRet(strmReadChar(pThis, &c)); + + /* append previous message to current message if necessary */ + if(pThis->prevLineSegment != NULL) { + cstrFinalize(pThis->prevLineSegment); + dbgprintf("readLine: have previous line segment: '%s'\n", + rsCStrGetSzStrNoNULL(pThis->prevLineSegment)); + CHKiRet(cstrAppendCStr(*ppCStr, pThis->prevLineSegment)); + cstrDestruct(&pThis->prevLineSegment); + } + if(mode == 0) { + while(c != '\n') { + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } + if (trimLineOverBytes > 0 && (uint32_t) cstrLen(*ppCStr) > trimLineOverBytes) { + /* Truncate long line at trimLineOverBytes position */ + dbgprintf("Truncate long line at %u, mode %d\n", trimLineOverBytes, mode); + rsCStrTruncate(*ppCStr, cstrLen(*ppCStr) - trimLineOverBytes); + cstrAppendChar(*ppCStr, '\n'); + } + cstrFinalize(*ppCStr); + } else if(mode == 1) { + finished=0; + while(finished == 0){ + if(c != '\n') { + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + pThis->bPrevWasNL = 0; + } else { + if ((((*ppCStr)->iStrLen) > 0) ){ + if(pThis->bPrevWasNL && escapeLFString_len > 0) { + rsCStrTruncate(*ppCStr, (bEscapeLF) ? escapeLFString_len : 1); + /* remove the prior newline */ + finished=1; + } else { + if(bEscapeLF) { + if(escapeLFString == NULL) { + CHKiRet(rsCStrAppendStrWithLen(*ppCStr, + (uchar*)"#012", sizeof("#012")-1)); + } else { + CHKiRet(rsCStrAppendStrWithLen(*ppCStr, + escapeLFString, escapeLFString_len)); + } + } else { + CHKiRet(cstrAppendChar(*ppCStr, c)); + } + CHKiRet(strmReadChar(pThis, &c)); + pThis->bPrevWasNL = 1; + } + } else { + finished=1; /* this is a blank line, a \n with nothing since + the last complete record */ + } + } + } + cstrFinalize(*ppCStr); + pThis->bPrevWasNL = 0; + } else if(mode == 2) { + /* indented follow-up lines */ + finished=0; + while(finished == 0){ + if ((*ppCStr)->iStrLen == 0){ + if(c != '\n') { + /* nothing in the buffer, and it's not a newline, add it to the buffer */ + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } else { + finished=1; /* this is a blank line, a \n with nothing since the + last complete record */ + } + } else { + if(pThis->bPrevWasNL) { + if ((c == ' ') || (c == '\t')){ + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + pThis->bPrevWasNL = 0; + } else { + /* clean things up by putting the character we just read back into + * the input buffer and removing the LF character that is + * currently at the + * end of the output string */ + CHKiRet(strmUnreadChar(pThis, c)); + if(bEscapeLF && escapeLFString_len > 0) { + rsCStrTruncate(*ppCStr, (bEscapeLF) ? escapeLFString_len : 1); + } + finished=1; + } + } else { /* not the first character after a newline, add it to the buffer */ + if(c == '\n') { + pThis->bPrevWasNL = 1; + if(bEscapeLF && escapeLFString_len > 0) { + if(escapeLFString == NULL) { + CHKiRet(rsCStrAppendStrWithLen(*ppCStr, + (uchar*)"#012", sizeof("#012")-1)); + } else { + CHKiRet(rsCStrAppendStrWithLen(*ppCStr, + escapeLFString, escapeLFString_len)); + } + } else { + CHKiRet(cstrAppendChar(*ppCStr, c)); + } + } else { + CHKiRet(cstrAppendChar(*ppCStr, c)); + } + CHKiRet(strmReadChar(pThis, &c)); + } + } + } + if (trimLineOverBytes > 0 && (uint32_t) cstrLen(*ppCStr) > trimLineOverBytes) { + /* Truncate long line at trimLineOverBytes position */ + dbgprintf("Truncate long line at %u, mode %d\n", trimLineOverBytes, mode); + rsCStrTruncate(*ppCStr, cstrLen(*ppCStr) - trimLineOverBytes); + cstrAppendChar(*ppCStr, '\n'); + } + cstrFinalize(*ppCStr); + pThis->bPrevWasNL = 0; + } + +finalize_it: + if(iRet == RS_RET_OK) { + if(strtOffs != NULL) { + *strtOffs = pThis->strtOffs; + } + pThis->strtOffs = pThis->iCurrOffs; /* we are at begin of next line */ + } else { +DBGPRINTF("RGER: strmReadLine iRet %d\n", iRet); + if(*ppCStr != NULL) { + if(cstrLen(*ppCStr) > 0) { + /* we may have an empty string in an unsuccesfull poll or after restart! */ + if(rsCStrConstructFromCStr(&pThis->prevLineSegment, *ppCStr) != RS_RET_OK) { + /* we cannot do anything against this, but we can at least + * ensure we do not have any follow-on errors. + */ + pThis->prevLineSegment = NULL; + } + } + cstrDestruct(ppCStr); + } + } + + RETiRet; +} + +/* check if the current multi line read is timed out + * @return 0 - no timeout, something else - timeout + */ +int +strmReadMultiLine_isTimedOut(const strm_t *const __restrict__ pThis) +{ + /* note: order of evaluation is choosen so that the most inexpensive + * processing flow is used. + */ + DBGPRINTF("strmReadMultiline_isTimedOut: prevMsgSeg %p, readTimeout %d, " + "lastRead %lld\n", pThis->prevMsgSegment, pThis->readTimeout, + (long long) pThis->lastRead); + return( (pThis->readTimeout) + && (pThis->prevMsgSegment != NULL) + && (getTime(NULL) > pThis->lastRead + pThis->readTimeout) ); +} + +/* read a multi-line message from a strm file. + * The multi-line message is terminated based on the user-provided + * startRegex or endRegex (Posix ERE). For performance reasons, the regex + * must already have been compiled by the user. + * added 2015-05-12 rgerhards + */ +rsRetVal ATTR_NONNULL(1,2) +strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *start_preg, regex_t *end_preg, const sbool bEscapeLF, + const uchar *const escapeLFString, const sbool discardTruncatedMsg, const sbool msgDiscardingError, + int64 *const strtOffs) +{ + uchar c; + uchar finished = 0; + cstr_t *thisLine = NULL; + rsRetVal readCharRet; + const time_t tCurr = pThis->readTimeout ? getTime(NULL) : 0; + size_t maxMsgSize = glblGetMaxLine(runConf); + DEFiRet; + + do { + CHKiRet(strmReadChar(pThis, &c)); /* immediately exit on EOF */ + pThis->lastRead = tCurr; + CHKiRet(cstrConstruct(&thisLine)); + /* append previous message to current message if necessary */ + if(pThis->prevLineSegment != NULL) { + CHKiRet(cstrAppendCStr(thisLine, pThis->prevLineSegment)); + cstrDestruct(&pThis->prevLineSegment); + } + + while(c != '\n') { + CHKiRet(cstrAppendChar(thisLine, c)); + readCharRet = strmReadChar(pThis, &c); + if(readCharRet == RS_RET_EOF) {/* end of file reached without \n? */ + CHKiRet(rsCStrConstructFromCStr(&pThis->prevLineSegment, thisLine)); + } + CHKiRet(readCharRet); + } + cstrFinalize(thisLine); + + /* we have a line, now let's assemble the message */ + const int isStartMatch = start_preg ? + !regexec(start_preg, (char*)rsCStrGetSzStrNoNULL(thisLine), 0, NULL, 0) : + 0; + const int isEndMatch = end_preg ? + !regexec(end_preg, (char*)rsCStrGetSzStrNoNULL(thisLine), 0, NULL, 0) : + 0; + + if(isStartMatch) { + /* in this case, the *previous* message is complete and we are + * at the start of a new one. + */ + if(pThis->ignoringMsg == 0) { + if(pThis->prevMsgSegment != NULL) { + /* may be NULL in initial poll! */ + finished = 1; + *ppCStr = pThis->prevMsgSegment; + } + } + CHKiRet(rsCStrConstructFromCStr(&pThis->prevMsgSegment, thisLine)); + pThis->ignoringMsg = 0; + } else { + if(pThis->ignoringMsg == 0) { + if(pThis->prevMsgSegment == NULL) { + /* may be NULL in initial poll or after timeout! */ + CHKiRet(rsCStrConstructFromCStr(&pThis->prevMsgSegment, thisLine)); + } else { + if(bEscapeLF) { + if(escapeLFString == NULL) { + rsCStrAppendStrWithLen(pThis->prevMsgSegment, (uchar*)"\\n", 2); + } else { + rsCStrAppendStr(pThis->prevMsgSegment, escapeLFString); + } + } else { + cstrAppendChar(pThis->prevMsgSegment, '\n'); + } + + + size_t currLineLen = cstrLen(thisLine); + if(currLineLen > 0) { + size_t len; + if((len = cstrLen(pThis->prevMsgSegment) + currLineLen) < + maxMsgSize) { + CHKiRet(cstrAppendCStr(pThis->prevMsgSegment, thisLine)); + /* we could do this faster, but for now keep it simple */ + } else { + if (cstrLen(pThis->prevMsgSegment) > maxMsgSize) { + len = 0; + } else { + len = currLineLen-(len-maxMsgSize); + for(size_t z=0; zprevMsgSegment, + thisLine->pBuf[z]); + } + } + finished = 1; + *ppCStr = pThis->prevMsgSegment; + CHKiRet(rsCStrConstructFromszStr(&pThis->prevMsgSegment, + thisLine->pBuf+len)); + if(discardTruncatedMsg == 1) { + pThis->ignoringMsg = 1; + } + if(msgDiscardingError == 1) { + if(discardTruncatedMsg == 1) { + LogError(0, RS_RET_ERR, + "imfile error: message received is " + "larger than max msg size; " + "rest of message will not be " + "processed"); + } else { + LogError(0, RS_RET_ERR, + "imfile error: message received is " + "larger than max msg size; message " + "will be split and processed as " + "another message"); + } + } + } + } + } + } + } + if(isEndMatch) { + /* in this case, the *current* message is complete and we are + * at the end of it. + */ + if(pThis->ignoringMsg == 0) { + if(pThis->prevMsgSegment != NULL) { + finished = 1; + *ppCStr = pThis->prevMsgSegment; + pThis->prevMsgSegment= NULL; + } + } + pThis->ignoringMsg = 0; + } + cstrDestruct(&thisLine); + } while(finished == 0); + +finalize_it: + *strtOffs = pThis->strtOffs; + if(thisLine != NULL) { + cstrDestruct(&thisLine); + } + if(iRet == RS_RET_OK) { + pThis->strtOffs = pThis->iCurrOffs; /* we are at begin of next line */ + cstrFinalize(*ppCStr); + } else { + if( pThis->readTimeout + && (pThis->prevMsgSegment != NULL) + && (tCurr > pThis->lastRead + pThis->readTimeout)) { + if(rsCStrConstructFromCStr(ppCStr, pThis->prevMsgSegment) == RS_RET_OK) { + cstrFinalize(*ppCStr); + cstrDestruct(&pThis->prevMsgSegment); + pThis->lastRead = tCurr; + pThis->strtOffs = pThis->iCurrOffs; /* we are at begin of next line */ + dbgprintf("stream: generated msg based on timeout: %s\n", + cstrGetSzStrNoNULL(*ppCStr)); + iRet = RS_RET_OK; + } + } + } + RETiRet; +} + +/* Standard-Constructor for the strm object + */ +BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! */ + pThis->iCurrFNum = 1; + pThis->fd = -1; + pThis->fdDir = -1; + pThis->iUngetC = -1; + pThis->bVeryReliableZip = 0; + pThis->sType = STREAMTYPE_FILE_SINGLE; + pThis->sIOBufSize = glblGetIOBufSize(); + pThis->tOpenMode = 0600; + pThis->compressionDriver = STRM_COMPRESS_ZIP; + pThis->pszSizeLimitCmd = NULL; + pThis->prevLineSegment = NULL; + pThis->prevMsgSegment = NULL; + pThis->strtOffs = 0; + pThis->ignoringMsg = 0; + pThis->bPrevWasNL = 0; + pThis->fileNotFoundError = 1; + pThis->noRepeatedErrorOutput = 0; + pThis->lastRead = getTime(NULL); +ENDobjConstruct(strm) + + +/* ConstructionFinalizer + * rgerhards, 2008-01-09 + */ +static rsRetVal strmConstructFinalize(strm_t *pThis) +{ + pthread_mutexattr_t mutAttr; + rsRetVal localRet; + int i; + DEFiRet; + + assert(pThis != NULL); + + pThis->iBufPtrMax = 0; /* results in immediate read request */ + if(pThis->iZipLevel) { /* do we need a zip buf? */ + if(pThis->compressionDriver == STRM_COMPRESS_ZSTD) { + localRet = objUse(zstdw, LM_ZSTDW_FILENAME); + if(localRet != RS_RET_OK) { + pThis->iZipLevel = 0; + LogError(0, localRet, "stream was requested with zstd compression mode, " + "but zstdw module unavailable - using without compression\n"); + } + } else { + assert(pThis->compressionDriver == STRM_COMPRESS_ZIP); + localRet = objUse(zlibw, LM_ZLIBW_FILENAME); + if(localRet != RS_RET_OK) { + pThis->iZipLevel = 0; + LogError(0, localRet, "stream was requested with zip mode, but zlibw " + "module unavailable - using without zip\n"); + } + } + /* we use the same size as the original buf, as we would like + * to make sure we can write out everything with a SINGLE api call! + * We add another 128 bytes to take care of the gzip header and "all eventualities". + */ + CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(pThis->sIOBufSize + 128)); + } + + /* if we are set to sync, we must obtain a file handle to the directory for fsync() purposes */ + if(pThis->bSync && !pThis->bIsTTY && pThis->pszDir != NULL) { + pThis->fdDir = open((char*)pThis->pszDir, O_RDONLY | O_CLOEXEC | O_NOCTTY); + if(pThis->fdDir == -1) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("error %d opening directory file for fsync() use - fsync for directory " + "disabled: %s\n", errno, errStr); + } + } + + /* if we have a flush interval, we need to do async writes in any case */ + if(pThis->iFlushInterval != 0) { + pThis->bAsyncWrite = 1; + } + + DBGPRINTF("file stream %s params: flush interval %d, async write %d\n", + getFileDebugName(pThis), + pThis->iFlushInterval, pThis->bAsyncWrite); + + /* if we work asynchronously, we need a couple of synchronization objects */ + if(pThis->bAsyncWrite) { + /* the mutex must be recursive, because objects may call into other + * object identifiers recursively. + */ + pthread_mutexattr_init(&mutAttr); + pthread_mutexattr_settype(&mutAttr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(&pThis->mut, &mutAttr); + pthread_cond_init(&pThis->notFull, 0); + pthread_cond_init(&pThis->notEmpty, 0); + pthread_cond_init(&pThis->isEmpty, 0); + pThis->iCnt = pThis->iEnq = pThis->iDeq = 0; + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + CHKmalloc(pThis->asyncBuf[i].pBuf = (uchar*) malloc(pThis->sIOBufSize)); + } + pThis->pIOBuf = pThis->asyncBuf[0].pBuf; + pThis->bStopWriter = 0; + if(pthread_create(&pThis->writerThreadID, + &default_thread_attr, + asyncWriterThread, pThis) != 0) + DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); + } else { + /* we work synchronously, so we need to alloc a fixed pIOBuf */ + CHKmalloc(pThis->pIOBuf = (uchar*) malloc(pThis->sIOBufSize)); + CHKmalloc(pThis->pIOBuf_truncation = (char*) malloc(pThis->sIOBufSize)); + } + +finalize_it: + RETiRet; +} + + +/* destructor for the strm object */ +BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ + int i; +CODESTARTobjDestruct(strm) + /* we need to stop the ZIP writer */ + if(pThis->bAsyncWrite) + /* Note: mutex will be unlocked in strmCloseFile/stopWriter! */ + d_pthread_mutex_lock(&pThis->mut); + + /* strmClose() will handle read-only files as well as need to open + * files that have unwritten buffers. -- rgerhards, 2010-03-09 + */ + strmCloseFile(pThis); + + if(pThis->bAsyncWrite) { + pthread_mutex_destroy(&pThis->mut); + pthread_cond_destroy(&pThis->notFull); + pthread_cond_destroy(&pThis->notEmpty); + pthread_cond_destroy(&pThis->isEmpty); + for(i = 0 ; i < STREAM_ASYNC_NUMBUFS ; ++i) { + free(pThis->asyncBuf[i].pBuf); + } + } else { + free(pThis->pIOBuf); + free(pThis->pIOBuf_truncation); + } + + /* Finally, we can free the resources. + * IMPORTANT: we MUST free this only AFTER the ansyncWriter has been stopped, else + * we get random errors... + */ + if(pThis->compressionDriver == STRM_COMPRESS_ZSTD) { + zstdw.Destruct(pThis); + } + if(pThis->prevLineSegment) + cstrDestruct(&pThis->prevLineSegment); + if(pThis->prevMsgSegment) + cstrDestruct(&pThis->prevMsgSegment); + free(pThis->pszDir); + free(pThis->pZipBuf); + free(pThis->pszCurrFName); + free(pThis->pszFName); + free(pThis->pszSizeLimitCmd); + pThis->bStopWriter = 2; /* RG: use as flag for destruction */ +ENDobjDestruct(strm) + + +/* check if we need to open a new file (in output mode only). + * The decision is based on file size AND record delimition state. + * This method may also be called on a closed file, in which case + * it immediately returns. + */ +static rsRetVal strmCheckNextOutputFile(strm_t *pThis) +{ + DEFiRet; + + if(pThis->fd == -1 || pThis->sType != STREAMTYPE_FILE_CIRCULAR) + FINALIZE; + + /* wait for output to be empty, so that our counts are correct */ + strmWaitAsyncWriterDone(pThis); + + if(pThis->iCurrOffs >= pThis->iMaxFileSize) { + DBGOPRINT((obj_t*) pThis, "max file size %ld reached for %d, now %ld - starting new file\n", + (long) pThis->iMaxFileSize, pThis->fd, (long) pThis->iCurrOffs); + CHKiRet(strmNextFile(pThis)); + } + +finalize_it: + RETiRet; +} + + +/* try to recover a tty after a write error. This may have happend + * due to vhangup(), and, if so, we can simply re-open it. + */ +#ifdef linux +# define ERR_TTYHUP EIO +#else +# define ERR_TTYHUP EBADF +#endif +static rsRetVal +tryTTYRecover(strm_t *pThis, int err) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); +#ifndef __FreeBSD__ + if(err == ERR_TTYHUP) { +#else + /* Try to reopen our file descriptor even on errno 6, FreeBSD bug 200429 + * Also try on errno 5, FreeBSD bug 211033 + */ + if(err == ERR_TTYHUP || err == ENXIO || err == EIO) { +#endif /* __FreeBSD__ */ + close(pThis->fd); + pThis->fd = -1; + CHKiRet(doPhysOpen(pThis)); + } + +finalize_it: + RETiRet; +} +#undef ER_TTYHUP + + +/* issue write() api calls until either the buffer is completely + * written or an error occurred (it may happen that multiple writes + * are required, what is perfectly legal. On exit, *pLenBuf contains + * the number of bytes actually written. + * rgerhards, 2009-06-08 + */ +static rsRetVal ATTR_NONNULL(1,2,3) +doWriteCall(strm_t *pThis, uchar *pBuf, size_t *pLenBuf) +{ + ssize_t lenBuf; + ssize_t iTotalWritten; + ssize_t iWritten; + char *pWriteBuf; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); +#ifdef __FreeBSD__ + sbool crnlNow = 0; +#endif /* __FreeBSD__ */ + + lenBuf = *pLenBuf; + pWriteBuf = (char*) pBuf; + iTotalWritten = 0; + do { + #ifdef __FreeBSD__ + if (pThis->bIsTTY && !pThis->iZipLevel && !pThis->cryprov) { + char *pNl = NULL; + if (crnlNow == 0) pNl = strchr(pWriteBuf, '\n'); + else crnlNow = 0; + if (pNl == pWriteBuf) { + iWritten = write(pThis->fd, "\r", 1); + if (iWritten > 0) { + crnlNow = 1; + iWritten = 0; + } + } else iWritten = write(pThis->fd, pWriteBuf, pNl ? pNl - pWriteBuf : lenBuf); + } else + #endif /* __FreeBSD__ */ + iWritten = write(pThis->fd, pWriteBuf, lenBuf); + if(iWritten < 0) { + const int err = errno; + iWritten = 0; /* we have written NO bytes! */ + if(err == EBADF) { + DBGPRINTF("file %s: errno %d, fd %d no longer valid, recovery by " + "reopen; if you see this, consider reporting at " + "https://github.com/rsyslog/rsyslog/issues/3404 " + "so that we know when it happens. Include output of uname -a. " + "OS error reason", pThis->pszCurrFName, err, pThis->fd); + pThis->fd = -1; + CHKiRet(doPhysOpen(pThis)); + } else { + if(err != EINTR) { + LogError(err, RS_RET_IO_ERROR, "file '%s'[%d] write error - see " + "https://www.rsyslog.com/solving-rsyslog-write-errors/ for help " + "OS error", pThis->pszCurrFName, pThis->fd); + } + if(err == EINTR) { + /*NO ERROR, just continue */; + } else if( !pThis->bIsTTY && ( err == ENOTCONN || err == EIO )) { + /* Failure for network file system, thus file needs to be closed + * and reopened. */ + close(pThis->fd); + pThis->fd = -1; + CHKiRet(doPhysOpen(pThis)); + } else { + if(pThis->bIsTTY) { + CHKiRet(tryTTYRecover(pThis, err)); + } else { + ABORT_FINALIZE(RS_RET_IO_ERROR); + /* Would it make sense to cover more error cases? So far, I + * do not see good reason to do so. + */ + } + } + } + } + /* advance buffer to next write position */ + iTotalWritten += iWritten; + lenBuf -= iWritten; + pWriteBuf += iWritten; + } while(lenBuf > 0); /* Warning: do..while()! */ + + DBGOPRINT((obj_t*) pThis, "file %d write wrote %d bytes\n", pThis->fd, (int) iWritten); + +finalize_it: + *pLenBuf = iTotalWritten; + RETiRet; +} + + + +/* write memory buffer to a stream object. + */ +static rsRetVal +doWriteInternal(strm_t *pThis, uchar *pBuf, const size_t lenBuf, const int bFlush) +{ + DEFiRet; + + DBGOPRINT((obj_t*) pThis, "file %d(%s) doWriteInternal: bFlush %d\n", + pThis->fd, getFileDebugName(pThis), bFlush); + + if(pThis->iZipLevel) { + CHKiRet(doZipWrite(pThis, pBuf, lenBuf, bFlush)); + } else { + /* write without zipping */ + CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); + } + +finalize_it: + RETiRet; +} + + +/* This function is called to "do" an async write call, what primarily means that + * the data is handed over to the writer thread (which will then do the actual write + * in parallel). Note that the stream mutex has already been locked by the + * strmWrite...() calls. Also note that we always have only a single producer, + * so we can simply serially assign the next free buffer to it and be sure that + * the very some producer comes back in sequence to submit the then-filled buffers. + * This also enables us to timout on partially written buffers. -- rgerhards, 2009-07-06 + */ +static rsRetVal +doAsyncWriteInternal(strm_t *pThis, size_t lenBuf, const int bFlushZip) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + DBGOPRINT((obj_t*) pThis, "file %d(%s) doAsyncWriteInternal at begin: " + "iCnt %d, iEnq %d, bFlushZip %d\n", + pThis->fd, getFileDebugName(pThis), + pThis->iCnt, pThis->iEnq, bFlushZip); + /* the -1 below is important, because we need one buffer for the main thread! */ + while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS - 1) + d_pthread_cond_wait(&pThis->notFull, &pThis->mut); + + pThis->asyncBuf[pThis->iEnq % STREAM_ASYNC_NUMBUFS].lenBuf = lenBuf; + pThis->pIOBuf = pThis->asyncBuf[++pThis->iEnq % STREAM_ASYNC_NUMBUFS].pBuf; + if(!pThis->bFlushNow) /* if we already need to flush, do not overwrite */ + pThis->bFlushNow = bFlushZip; + + pThis->bDoTimedWait = 0; /* everything written, no need to timeout partial buffer writes */ + if(++pThis->iCnt == 1) { + pthread_cond_signal(&pThis->notEmpty); + DBGOPRINT((obj_t*) pThis, "doAsyncWriteInternal signaled notEmpty\n"); + } + DBGOPRINT((obj_t*) pThis, "file %d(%s) doAsyncWriteInternal at exit: " + "iCnt %d, iEnq %d, bFlushZip %d\n", + pThis->fd, getFileDebugName(pThis), + pThis->iCnt, pThis->iEnq, bFlushZip); + + RETiRet; +} + + +/* schedule writing to the stream. Depending on our concurrency settings, + * this either directly writes to the stream or schedules writing via + * the background thread. -- rgerhards, 2009-07-07 + */ +static rsRetVal +strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, const int bFlushZip) +{ + DEFiRet; + + assert(pThis != NULL); + + /* we need to reset the buffer pointer BEFORE calling the actual write + * function. Otherwise, in circular mode, the write function will + * potentially close the file, then close will flush and as the + * buffer pointer is nonzero, will re-call into this code here. In + * the end result, we than have a problem (and things are screwed + * up). So we reset the buffer pointer first, and all this can + * not happen. It is safe to do so, because that pointer is NOT + * used inside the write functions. -- rgerhads, 2010-03-10 + */ + pThis->iBufPtr = 0; /* we are at the begin of a new buffer */ + if(pThis->bAsyncWrite) { + CHKiRet(doAsyncWriteInternal(pThis, lenBuf, bFlushZip)); + } else { + CHKiRet(doWriteInternal(pThis, pBuf, lenBuf, bFlushZip)); + } + + +finalize_it: + RETiRet; +} + + + +/* This is the writer thread for asynchronous mode. + * -- rgerhards, 2009-07-06 + */ +static void* +asyncWriterThread(void *pPtr) +{ + int iDeq; + struct timespec t; + sbool bTimedOut = 0; + strm_t *pThis = (strm_t*) pPtr; + int err; + uchar thrdName[256] = "rs:"; + ISOBJ_TYPE_assert(pThis, strm); + + ustrncpy(thrdName+3, pThis->pszFName, sizeof(thrdName)-4); + dbgOutputTID((char*)thrdName); +# if defined(HAVE_PRCTL) && defined(PR_SET_NAME) + if(prctl(PR_SET_NAME, (char*)thrdName, 0, 0, 0) != 0) { + DBGPRINTF("prctl failed, not setting thread name for '%s'\n", "stream writer"); + } +# endif + + d_pthread_mutex_lock(&pThis->mut); + while(1) { /* loop broken inside */ + while(pThis->iCnt == 0) { + DBGOPRINT((obj_t*) pThis, "file %d(%s) asyncWriterThread new iteration, " + "iCnt %d, bTimedOut %d, iFlushInterval %d\n", pThis->fd, + getFileDebugName(pThis), + pThis->iCnt, bTimedOut, pThis->iFlushInterval); + if(pThis->bStopWriter) { + pthread_cond_broadcast(&pThis->isEmpty); + d_pthread_mutex_unlock(&pThis->mut); + goto finalize_it; /* break main loop */ + } + if(bTimedOut && pThis->iBufPtr > 0) { + /* if we timed out, we need to flush pending data */ + strmFlushInternal(pThis, 1); + bTimedOut = 0; + continue; + } + bTimedOut = 0; + if(pThis->bDoTimedWait) { + timeoutComp(&t, pThis->iFlushInterval * 1000); /* 1000 *millisconds* */ + if((err = pthread_cond_timedwait(&pThis->notEmpty, &pThis->mut, &t)) != 0) { + DBGOPRINT((obj_t*) pThis, "file %d(%s) asyncWriterThread timed out\n", + pThis->fd, getFileDebugName(pThis)); + bTimedOut = 1; /* simulate in any case */ + if(err != ETIMEDOUT) { + char errStr[1024]; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("stream async writer timeout with error (%d): %s - " + "ignoring\n", err, errStr); + } + } + } else { + d_pthread_cond_wait(&pThis->notEmpty, &pThis->mut); + } + } + + DBGOPRINT((obj_t*) pThis, "file %d(%s) asyncWriterThread awoken, " + "iCnt %d, bTimedOut %d\n", pThis->fd, getFileDebugName(pThis), + pThis->iCnt, bTimedOut); + bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ + + iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; + const int bFlush = (pThis->bFlushNow || bTimedOut) ? 1 : 0; + pThis->bFlushNow = 0; + + /* now we can do the actual write in parallel */ + d_pthread_mutex_unlock(&pThis->mut); + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, bFlush); + // TODO: error check????? 2009-07-06 + d_pthread_mutex_lock(&pThis->mut); + + --pThis->iCnt; + if(pThis->iCnt < STREAM_ASYNC_NUMBUFS) { + pthread_cond_signal(&pThis->notFull); + if(pThis->iCnt == 0) + pthread_cond_broadcast(&pThis->isEmpty); + } + } + /* Not reached */ + +finalize_it: + DBGOPRINT((obj_t*) pThis, "file %d(%s) asyncWriterThread terminated\n", + pThis->fd, getFileDebugName(pThis)); + return NULL; /* to keep pthreads happy */ +} + + +/* sync the file to disk, so that any unwritten data is persisted. This + * also syncs the directory and thus makes sure that the file survives + * fatal failure. Note that we do NOT return an error status if the + * sync fails. Doing so would probably cause more trouble than it + * is worth (read: data loss may occur where we otherwise might not + * have it). -- rgerhards, 2009-06-08 + */ +#undef SYNCCALL +#if defined(HAVE_FDATASYNC) && !defined(__APPLE__) +# define SYNCCALL(x) fdatasync(x) +#else +# define SYNCCALL(x) fsync(x) +#endif +static rsRetVal +syncFile(strm_t *pThis) +{ + int ret; + DEFiRet; + + if(pThis->bIsTTY) + FINALIZE; /* TTYs can not be synced */ + + DBGPRINTF("syncing file %d\n", pThis->fd); + ret = SYNCCALL(pThis->fd); + if(ret != 0) { + char errStr[1024]; + int err = errno; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("sync failed for file %d with error (%d): %s - ignoring\n", + pThis->fd, err, errStr); + } + + if(pThis->fdDir != -1) { + if(fsync(pThis->fdDir) != 0) + DBGPRINTF("stream/syncFile: fsync returned error, ignoring\n"); + } + +finalize_it: + RETiRet; +} +#undef SYNCCALL + +/* physically write to the output file. the provided data is ready for + * writing (e.g. zipped if we are requested to do that). + * Note that if the write() API fails, we do not reset any pointers, but return + * an error code. That means we may redo work in the next iteration. + * rgerhards, 2009-06-04 + */ +static rsRetVal +strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +{ + size_t iWritten; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + DBGPRINTF("strmPhysWrite, stream %p, len %u\n", pThis, (unsigned)lenBuf); + if(pThis->fd == -1) + CHKiRet(strmOpenFile(pThis)); + + /* here we place our crypto interface */ + if(pThis->cryprov != NULL) { + pThis->cryprov->Encrypt(pThis->cryprovFileData, pBuf, &lenBuf); + } + /* end crypto */ + + iWritten = lenBuf; + CHKiRet(doWriteCall(pThis, pBuf, &iWritten)); + + pThis->iCurrOffs += iWritten; + /* update user counter, if provided */ + if(pThis->pUsrWCntr != NULL) + *pThis->pUsrWCntr += iWritten; + + if(pThis->bSync) { + CHKiRet(syncFile(pThis)); + } + + if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) { + CHKiRet(strmCheckNextOutputFile(pThis)); + } + +finalize_it: + RETiRet; +} + + +/* write the output buffer in zip mode + * This means we compress it first and then do a physical write. + * Note that we always do a full deflateInit ... deflate ... deflateEnd + * sequence. While this is not optimal, we need to do it because we need + * to ensure that the file is readable even when we are aborted. Doing the + * full sequence brings us as far towards this goal as possible (and not + * doing it would be a total failure). It may be worth considering to + * add a config switch so that the user can decide the risk he is ready + * to take, but so far this is not yet implemented (not even requested ;)). + * rgerhards, 2009-06-04 + */ +static rsRetVal +doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, const int bFlush) +{ + if(pThis->compressionDriver == STRM_COMPRESS_ZSTD) { + return zstdw.doStrmWrite(pThis, pBuf, lenBuf, bFlush, strmPhysWrite); + } else { + return zlibw.doStrmWrite(pThis, pBuf, lenBuf, bFlush, strmPhysWrite); + } +} + + + +/* finish zlib buffer, to be called before closing the ZIP file (if + * running in stream mode). + */ +static rsRetVal +doZipFinish(strm_t *pThis) +{ + if(pThis->compressionDriver == STRM_COMPRESS_ZSTD) { + return zstdw.doCompressFinish(pThis, strmPhysWrite); + } else { + return zlibw.doCompressFinish(pThis, strmPhysWrite); + } +} + +/* flush stream output buffer to persistent storage. This can be called at any time + * and is automatically called when the output buffer is full. + * rgerhards, 2008-01-10 + */ +static rsRetVal +strmFlushInternal(strm_t *pThis, int bFlushZip) +{ + DEFiRet; + + assert(pThis != NULL); + DBGOPRINT((obj_t*) pThis, "strmFlushinternal: file %d(%s) flush, buflen %ld%s\n", pThis->fd, + getFileDebugName(pThis), + (long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : ""); + + if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { + iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr, bFlushZip); + } + + RETiRet; +} + + +/* flush stream output buffer to persistent storage. This can be called at any time + * and is automatically called when the output buffer is full. This function is for + * use by EXTERNAL callers. Do NOT use it internally. It locks the async writer + * mutex if ther is need to do so. + * rgerhards, 2010-03-18 + */ +static rsRetVal +strmFlush(strm_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + + DBGOPRINT((obj_t*) pThis, "file %d strmFlush\n", pThis->fd); + + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + CHKiRet(strmFlushInternal(pThis, 1)); + +finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + + RETiRet; +} + + +/* seek a stream to a specific location. Pending writes are flushed, read data + * is invalidated. + * rgerhards, 2008-01-12 + */ +static rsRetVal ATTR_NONNULL() +strmSeek(strm_t *pThis, const off64_t offs) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + + if(pThis->fd == -1) { + CHKiRet(strmOpenFile(pThis)); + } else { + CHKiRet(strmFlushInternal(pThis, 0)); + } + DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs); + const off64_t i = lseek64(pThis->fd, offs, SEEK_SET); + if(i != offs) { + LogError(errno, RS_RET_IO_ERROR, "file %s: unexpected error seeking to " + "offset %lld (ret %lld) - further malfunctions may happen", + pThis->pszCurrFName, (long long) i, (long long) offs); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + pThis->strtOffs = pThis->iCurrOffs = offs; /* we are now at *this* offset */ + pThis->iBufPtr = 0; /* buffer invalidated */ + +finalize_it: + RETiRet; +} + +/* multi-file seek, seeks to file number & offset within file. This + * is a support function for the queue, in circular mode. DO NOT USE + * IT FOR OTHER NEEDS - it may not work as expected. It will + * seek to the new position and delete interim files, as it skips them. + * Note: this code can be removed when the queue gets a new disk store + * handler (if and when it does ;)). + * The output parameter bytesDel receives the number of bytes that have + * been deleted (if a file is deleted) or 0 if nothing was deleted. + * rgerhards, 2012-11-07 + */ +rsRetVal +strmMultiFileSeek(strm_t *pThis, unsigned int FNum, off64_t offs, off64_t *bytesDel) +{ + struct stat statBuf; + int skipped_files; + DEFiRet; + ISOBJ_TYPE_assert(pThis, strm); + + if(FNum == 0 && offs == 0) { /* happens during queue init */ + *bytesDel = 0; + FINALIZE; + } + + skipped_files = FNum - pThis->iCurrFNum; + *bytesDel = 0; + + while(skipped_files > 0) { + CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, + pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, + pThis->iFileNumDigits)); + dbgprintf("rger: processing file %s\n", pThis->pszCurrFName); + if(stat((char*)pThis->pszCurrFName, &statBuf) != 0) { + LogError(errno, RS_RET_IO_ERROR, "unexpected error doing a stat() " + "on file %s - further malfunctions may happen", + pThis->pszCurrFName); + /* we do NOT error-terminate here as this could worsen the + * situation. As such, we just keep running and try to delete + * as many files as possible. + */ + } + *bytesDel += statBuf.st_size; + DBGPRINTF("strmMultiFileSeek: detected new filenum, was %u, new %u, " + "deleting '%s' (%lld bytes)\n", pThis->iCurrFNum, FNum, + pThis->pszCurrFName, (long long) statBuf.st_size); + unlink((char*)pThis->pszCurrFName); + if(pThis->cryprov != NULL) + pThis->cryprov->DeleteStateFiles(pThis->pszCurrFName); + free(pThis->pszCurrFName); + pThis->pszCurrFName = NULL; + pThis->iCurrFNum++; + --skipped_files; + } + DBGOPRINT((obj_t*) pThis, "strmMultiFileSeek: deleted %lld bytes in this run\n", + (long long) *bytesDel); + pThis->strtOffs = pThis->iCurrOffs = offs; + +finalize_it: + RETiRet; +} + + +/* seek to current offset. This is primarily a helper to readjust the OS file + * pointer after a strm object has been deserialized. + */ +static rsRetVal strmSeekCurrOffs(strm_t *pThis) +{ + off64_t targetOffs; + uchar c; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + + if(pThis->cryprov == NULL || pThis->tOperationsMode != STREAMMODE_READ) { + iRet = strmSeek(pThis, pThis->iCurrOffs); + FINALIZE; + } + + /* As the cryprov may use CBC or similiar things, we need to read skip data */ + targetOffs = pThis->iCurrOffs; + pThis->strtOffs = pThis->iCurrOffs = 0; + DBGOPRINT((obj_t*) pThis, "encrypted, doing skip read of %lld bytes\n", + (long long) targetOffs); + while(targetOffs != pThis->iCurrOffs) { + CHKiRet(strmReadChar(pThis, &c)); + } +finalize_it: + RETiRet; +} + + +/* write a *single* character to a stream object -- rgerhards, 2008-01-10 + */ +static rsRetVal strmWriteChar(strm_t *__restrict__ const pThis, const uchar c) +{ + DEFiRet; + + assert(pThis != NULL); + + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + + if(pThis->bDisabled) + ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + + /* if the buffer is full, we need to flush before we can write */ + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlushInternal(pThis, 0)); + } + + /* we now always have space for one character, so we simply copy it */ + *(pThis->pIOBuf + pThis->iBufPtr) = c; + pThis->iBufPtr++; + +finalize_it: + if(pThis->bAsyncWrite) + d_pthread_mutex_unlock(&pThis->mut); + + RETiRet; +} + + +/* write an integer value (actually a long) to a stream object + * Note that we do not need to lock the mutex here, because we call + * strmWrite(), which does the lock (aka: we must not lock it, else we + * would run into a recursive lock, resulting in a deadlock!) + */ +static rsRetVal strmWriteLong(strm_t *__restrict__ const pThis, const long i) +{ + DEFiRet; + uchar szBuf[32]; + + assert(pThis != NULL); + + CHKiRet(srUtilItoA((char*)szBuf, sizeof(szBuf), i)); + CHKiRet(strmWrite(pThis, szBuf, strlen((char*)szBuf))); + +finalize_it: + RETiRet; +} + + +/* write memory buffer to a stream object. + * process the data in chunks and copy it over to our buffer. The caller-provided data + * may theoritically be larger than our buffer. In that case, we do multiple copies. One + * may argue if it were more efficient to write out the caller-provided buffer in that case + * and earlier versions of rsyslog did this. However, this introduces a lot of complexity + * inside the buffered writer and potential performance bottlenecks when trying to solve + * it. Now keep in mind that we actually do (almost?) never have a case where the + * caller-provided buffer is larger than our one. So instead of optimizing a case + * which normally does not exist, we expect some degradation in its case but make us + * perform better in the regular cases. -- rgerhards, 2009-07-07 + * Note: the pThis->iBufPtr == pThis->sIOBufSize logic below looks a bit like an + * on-off error. In fact, it is not, because iBufPtr always points to the next + * *free* byte in the buffer. So if it is sIOBufSize - 1, there actually is one + * free byte left. This came up during a code walkthrough and was considered + * worth nothing. -- rgerhards, 2010-03-10 + */ +static rsRetVal ATTR_NONNULL(1,2) +strmWrite(strm_t *__restrict__ const pThis, const uchar *__restrict__ const pBuf, size_t lenBuf) +{ + DEFiRet; + size_t iWrite; + size_t iOffset; + + assert(pThis != NULL); + assert(pBuf != NULL); + + if(pThis->bDisabled) + ABORT_FINALIZE(RS_RET_STREAM_DISABLED); + + if(pThis->bAsyncWrite) + d_pthread_mutex_lock(&pThis->mut); + + iOffset = 0; + do { + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */ + } + iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ + if(iWrite > lenBuf) + iWrite = lenBuf; + memcpy(pThis->pIOBuf + pThis->iBufPtr, pBuf + iOffset, iWrite); + pThis->iBufPtr += iWrite; + iOffset += iWrite; + lenBuf -= iWrite; + } while(lenBuf > 0); + + /* now check if the buffer right at the end of the write is full and, if so, + * write it. This seems more natural than waiting (hours?) for the next message... + */ + if(pThis->iBufPtr == pThis->sIOBufSize) { + CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */ + } + if(pThis->fd != -1 && pThis->iSizeLimit != 0) { /* Only check if fd already set */ + CHKiRet(doSizeLimitProcessing(pThis)); + } + + +finalize_it: + if(pThis->bAsyncWrite) { + if(pThis->bDoTimedWait == 0) { + /* we potentially have a partial buffer, so re-activate the + * writer thread that it can set and pick up timeouts. + */ + pThis->bDoTimedWait = 1; + pthread_cond_signal(&pThis->notEmpty); + } + d_pthread_mutex_unlock(&pThis->mut); + } + + RETiRet; +} + + +/* property set methods */ +/* simple ones first */ +DEFpropSetMeth(strm, iMaxFileSize, int64) +DEFpropSetMeth(strm, iFileNumDigits, int) +DEFpropSetMeth(strm, tOperationsMode, int) +DEFpropSetMeth(strm, tOpenMode, mode_t) +DEFpropSetMeth(strm, compressionDriver, strm_compressionDriver_t) +DEFpropSetMeth(strm, sType, strmType_t) +DEFpropSetMeth(strm, iZipLevel, int) +DEFpropSetMeth(strm, bVeryReliableZip, int) +DEFpropSetMeth(strm, bSync, int) +DEFpropSetMeth(strm, bReopenOnTruncate, int) +DEFpropSetMeth(strm, sIOBufSize, size_t) +DEFpropSetMeth(strm, iSizeLimit, off_t) +DEFpropSetMeth(strm, iFlushInterval, int) +DEFpropSetMeth(strm, pszSizeLimitCmd, uchar*) +DEFpropSetMeth(strm, cryprov, cryprov_if_t*) +DEFpropSetMeth(strm, cryprovData, void*) + +/* sets timeout in seconds */ +void ATTR_NONNULL() +strmSetReadTimeout(strm_t *const __restrict__ pThis, const int val) +{ + ISOBJ_TYPE_assert(pThis, strm); + pThis->readTimeout = val; +} + +static rsRetVal ATTR_NONNULL() +strmSetbDeleteOnClose(strm_t *const pThis, const int val) +{ + ISOBJ_TYPE_assert(pThis, strm); + pThis->bDeleteOnClose = val; + if(pThis->cryprov != NULL) { + pThis->cryprov->SetDeleteOnClose(pThis->cryprovFileData, pThis->bDeleteOnClose); + } + return RS_RET_OK; +} + +static rsRetVal ATTR_NONNULL() +strmSetiMaxFiles(strm_t *const pThis, const int iNewVal) +{ + ISOBJ_TYPE_assert(pThis, strm); + pThis->iMaxFiles = iNewVal; + pThis->iFileNumDigits = getNumberDigits(iNewVal); + return RS_RET_OK; +} + +static rsRetVal ATTR_NONNULL() +strmSetFileNotFoundError(strm_t *const pThis, const int pFileNotFoundError) +{ + ISOBJ_TYPE_assert(pThis, strm); + pThis->fileNotFoundError = pFileNotFoundError; + return RS_RET_OK; +} + + +/* set the stream's file prefix + * The passed-in string is duplicated. So if the caller does not need + * it any longer, it must free it. + * rgerhards, 2008-01-09 + */ +static rsRetVal +strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName) +{ + DEFiRet; + + assert(pThis != NULL); + assert(pszName != NULL); + + if(iLenName < 1) + ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); + + if(pThis->pszFName != NULL) + free(pThis->pszFName); + + if((pThis->pszFName = malloc(iLenName + 1)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + + memcpy(pThis->pszFName, pszName, iLenName + 1); /* always think about the \0! */ + pThis->lenFName = iLenName; + +finalize_it: + RETiRet; +} + + +/* set the stream's directory + * The passed-in string is duplicated. So if the caller does not need + * it any longer, it must free it. + * rgerhards, 2008-01-09 + */ +static rsRetVal +strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) +{ + DEFiRet; + + assert(pThis != NULL); + assert(pszDir != NULL); + + if(iLenDir < 1) + ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); + + CHKmalloc(pThis->pszDir = malloc(iLenDir + 1)); + + memcpy(pThis->pszDir, pszDir, iLenDir + 1); /* always think about the \0! */ + pThis->lenDir = iLenDir; + +finalize_it: + RETiRet; +} + + +/* support for data records + * The stream class is able to write to multiple files. However, there are + * situation (actually quite common), where a single data record should not + * be split across files. This may be problematic if multiple stream write + * calls are used to create the record. To support that, we provide the + * bInRecord status variable. If it is set, no file spliting occurs. Once + * it is set to 0, a check is done if a split is necessary and it then + * happens. For a record-oriented caller, the proper sequence is: + * + * strmRecordBegin() + * strmWrite...() + * strmRecordEnd() + * + * Please note that records do not affect the writing of output buffers. They + * are always written when full. The only thing affected is circular files + * creation. So it is safe to write large records. + * + * IMPORTANT: RecordBegin() can not be nested! It is a programming error + * if RecordBegin() is called while already in a record! + * + * rgerhards, 2008-01-10 + */ +static rsRetVal strmRecordBegin(strm_t *pThis) +{ + assert(pThis != NULL); + assert(pThis->bInRecord == 0); + pThis->bInRecord = 1; + return RS_RET_OK; +} + +static rsRetVal strmRecordEnd(strm_t *pThis) +{ + DEFiRet; + assert(pThis != NULL); + assert(pThis->bInRecord == 1); + + pThis->bInRecord = 0; + iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */ + + RETiRet; +} +/* end stream record support functions */ + + +/* This method serializes a stream object. That means the whole + * object is modified into text form. That text form is suitable for + * later reconstruction of the object. + * The most common use case for this method is the creation of an + * on-disk representation of the message object. + * We do not serialize the dynamic properties. + * rgerhards, 2008-01-10 + */ +static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) +{ + DEFiRet; + int i; + int64 l; + + ISOBJ_TYPE_assert(pThis, strm); + ISOBJ_TYPE_assert(pStrm, strm); + + strmFlushInternal(pThis, 0); + CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis)); + + objSerializeSCALAR(pStrm, iCurrFNum, INT); /* implicit cast is OK for persistance */ + objSerializePTR(pStrm, pszFName, PSZ); + objSerializeSCALAR(pStrm, iMaxFiles, INT); + objSerializeSCALAR(pStrm, bDeleteOnClose, INT); + + i = pThis->sType; + objSerializeSCALAR_VAR(pStrm, sType, INT, i); + + i = pThis->tOperationsMode; + objSerializeSCALAR_VAR(pStrm, tOperationsMode, INT, i); + + i = pThis->tOpenMode; + objSerializeSCALAR_VAR(pStrm, tOpenMode, INT, i); + + l = pThis->iCurrOffs; + objSerializeSCALAR_VAR(pStrm, iCurrOffs, INT64, l); + + l = pThis->inode; + objSerializeSCALAR_VAR(pStrm, inode, INT64, l); + + l = pThis->strtOffs; + objSerializeSCALAR_VAR(pStrm, strtOffs, INT64, l); + + dbgprintf("strmSerialize: pThis->prevLineSegment %p\n", pThis->prevLineSegment); + if(pThis->prevLineSegment != NULL) { + cstrFinalize(pThis->prevLineSegment); + objSerializePTR(pStrm, prevLineSegment, CSTR); + } + + if(pThis->prevMsgSegment != NULL) { + cstrFinalize(pThis->prevMsgSegment); + objSerializePTR(pStrm, prevMsgSegment, CSTR); + } + + i = pThis->bPrevWasNL; + objSerializeSCALAR_VAR(pStrm, bPrevWasNL, INT, i); + + CHKiRet(obj.EndSerialize(pStrm)); + +finalize_it: + RETiRet; +} + + +/* duplicate a stream object excluding dynamic properties. This function is + * primarily meant to provide a duplicate that later on can be used to access + * the data. This is needed, for example, for a restart of the disk queue. + * Note that ConstructFinalize() is NOT called. So our caller may change some + * properties before finalizing things. + * rgerhards, 2009-05-26 + */ +static rsRetVal +strmDup(strm_t *const pThis, strm_t **ppNew) +{ + strm_t *pNew = NULL; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + assert(ppNew != NULL); + + CHKiRet(strmConstruct(&pNew)); + pNew->sType = pThis->sType; + pNew->iCurrFNum = pThis->iCurrFNum; + CHKmalloc(pNew->pszFName = ustrdup(pThis->pszFName)); + pNew->lenFName = pThis->lenFName; + CHKmalloc(pNew->pszDir = ustrdup(pThis->pszDir)); + pNew->lenDir = pThis->lenDir; + pNew->tOperationsMode = pThis->tOperationsMode; + pNew->tOpenMode = pThis->tOpenMode; + pNew->compressionDriver = pThis->compressionDriver; + pNew->iMaxFileSize = pThis->iMaxFileSize; + pNew->iMaxFiles = pThis->iMaxFiles; + pNew->iFileNumDigits = pThis->iFileNumDigits; + pNew->bDeleteOnClose = pThis->bDeleteOnClose; + pNew->iCurrOffs = pThis->iCurrOffs; + + *ppNew = pNew; + pNew = NULL; + +finalize_it: + if(pNew != NULL) + strmDestruct(&pNew); + + RETiRet; +} + + +static rsRetVal +SetCompressionWorkers(strm_t *const pThis, int num_wrkrs) +{ + ISOBJ_TYPE_assert(pThis, strm); + if(num_wrkrs < 0) + num_wrkrs = 1; + pThis->zstd.num_wrkrs = num_wrkrs; + return RS_RET_OK; +} + + +/* set a user write-counter. This counter is initialized to zero and + * receives the number of bytes written. It is accurate only after a + * flush(). This hook is provided as a means to control disk size usage. + * The pointer must be valid at all times (so if it is on the stack, be sure + * to remove it when you exit the function). Pointers are removed by + * calling strmSetWCntr() with a NULL param. Only one pointer is settable, + * any new set overwrites the previous one. + * rgerhards, 2008-02-27 + */ +static rsRetVal +strmSetWCntr(strm_t *pThis, number_t *pWCnt) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + + if(pWCnt != NULL) + *pWCnt = 0; + pThis->pUsrWCntr = pWCnt; + + RETiRet; +} + + +#include "stringbuf.h" + +/* This function can be used as a generic way to set properties. + * rgerhards, 2008-01-11 + */ +#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, UCHAR_CONSTANT(name), sizeof(name) - 1) +static rsRetVal strmSetProperty(strm_t *pThis, var_t *pProp) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + assert(pProp != NULL); + + if(isProp("sType")) { + CHKiRet(strmSetsType(pThis, (strmType_t) pProp->val.num)); + } else if(isProp("iCurrFNum")) { + pThis->iCurrFNum = (unsigned) pProp->val.num; + } else if(isProp("pszFName")) { + CHKiRet(strmSetFName(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr))); + } else if(isProp("tOperationsMode")) { + CHKiRet(strmSettOperationsMode(pThis, pProp->val.num)); + } else if(isProp("tOpenMode")) { + CHKiRet(strmSettOpenMode(pThis, pProp->val.num)); + } else if(isProp("iCurrOffs")) { + pThis->iCurrOffs = pProp->val.num; + } else if(isProp("inode")) { + pThis->inode = (ino_t) pProp->val.num; + } else if(isProp("strtOffs")) { + pThis->strtOffs = pProp->val.num; + } else if(isProp("iMaxFileSize")) { + CHKiRet(strmSetiMaxFileSize(pThis, pProp->val.num)); + } else if(isProp("fileNotFoundError")) { + CHKiRet(strmSetFileNotFoundError(pThis, pProp->val.num)); + } else if(isProp("iMaxFiles")) { + CHKiRet(strmSetiMaxFiles(pThis, pProp->val.num)); + } else if(isProp("iFileNumDigits")) { + CHKiRet(strmSetiFileNumDigits(pThis, pProp->val.num)); + } else if(isProp("bDeleteOnClose")) { + CHKiRet(strmSetbDeleteOnClose(pThis, pProp->val.num)); + } else if(isProp("prevLineSegment")) { + CHKiRet(rsCStrConstructFromCStr(&pThis->prevLineSegment, pProp->val.pStr)); + } else if(isProp("prevMsgSegment")) { + CHKiRet(rsCStrConstructFromCStr(&pThis->prevMsgSegment, pProp->val.pStr)); + } else if(isProp("bPrevWasNL")) { + pThis->bPrevWasNL = (sbool) pProp->val.num; + } + +finalize_it: + RETiRet; +} +#undef isProp + + +/* return the current offset inside the stream. Note that on two consequtive calls, the offset + * reported on the second call may actually be lower than on the first call. This is due to + * file circulation. A caller must deal with that. -- rgerhards, 2008-01-30 + */ +static rsRetVal +strmGetCurrOffset(strm_t *pThis, int64 *pOffs) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + assert(pOffs != NULL); + + *pOffs = pThis->iCurrOffs; + + RETiRet; +} + + +/* queryInterface function + * rgerhards, 2008-02-29 + */ +BEGINobjQueryInterface(strm) +CODESTARTobjQueryInterface(strm) + if(pIf->ifVersion != strmCURR_IF_VERSION) { /* check for current version, increment on each change */ + ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED); + } + + /* ok, we have the right interface, so let's fill it + * Please note that we may also do some backwards-compatibility + * work here (if we can support an older interface version - that, + * of course, also affects the "if" above). + */ + pIf->Construct = strmConstruct; + pIf->ConstructFinalize = strmConstructFinalize; + pIf->Destruct = strmDestruct; + pIf->ReadChar = strmReadChar; + pIf->UnreadChar = strmUnreadChar; + pIf->ReadLine = strmReadLine; + pIf->SeekCurrOffs = strmSeekCurrOffs; + pIf->Write = strmWrite; + pIf->WriteChar = strmWriteChar; + pIf->WriteLong = strmWriteLong; + pIf->SetFName = strmSetFName; + pIf->SetFileNotFoundError = strmSetFileNotFoundError; + pIf->SetDir = strmSetDir; + pIf->Flush = strmFlush; + pIf->RecordBegin = strmRecordBegin; + pIf->RecordEnd = strmRecordEnd; + pIf->Serialize = strmSerialize; + pIf->GetCurrOffset = strmGetCurrOffset; + pIf->Dup = strmDup; + pIf->SetCompressionWorkers = SetCompressionWorkers; + pIf->SetWCntr = strmSetWCntr; + pIf->CheckFileChange = CheckFileChange; + /* set methods */ + pIf->SetbDeleteOnClose = strmSetbDeleteOnClose; + pIf->SetiMaxFileSize = strmSetiMaxFileSize; + pIf->SetiMaxFiles = strmSetiMaxFiles; + pIf->SetiFileNumDigits = strmSetiFileNumDigits; + pIf->SettOperationsMode = strmSettOperationsMode; + pIf->SettOpenMode = strmSettOpenMode; + pIf->SetcompressionDriver = strmSetcompressionDriver; + pIf->SetsType = strmSetsType; + pIf->SetiZipLevel = strmSetiZipLevel; + pIf->SetbVeryReliableZip = strmSetbVeryReliableZip; + pIf->SetbSync = strmSetbSync; + pIf->SetbReopenOnTruncate = strmSetbReopenOnTruncate; + pIf->SetsIOBufSize = strmSetsIOBufSize; + pIf->SetiSizeLimit = strmSetiSizeLimit; + pIf->SetiFlushInterval = strmSetiFlushInterval; + pIf->SetpszSizeLimitCmd = strmSetpszSizeLimitCmd; + pIf->Setcryprov = strmSetcryprov; + pIf->SetcryprovData = strmSetcryprovData; +finalize_it: +ENDobjQueryInterface(strm) + + +/* Initialize the stream class. Must be called as the very first method + * before anything else is called inside this class. + * rgerhards, 2008-01-09 + */ +BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE) + /* request objects we use */ + + OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); + OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); + OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); +ENDObjClassInit(strm) -- cgit v1.2.3