summaryrefslogtreecommitdiffstats
path: root/runtime/tcpsrv.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/tcpsrv.c')
-rw-r--r--runtime/tcpsrv.c1799
1 files changed, 1799 insertions, 0 deletions
diff --git a/runtime/tcpsrv.c b/runtime/tcpsrv.c
new file mode 100644
index 0000000..df9bcec
--- /dev/null
+++ b/runtime/tcpsrv.c
@@ -0,0 +1,1799 @@
+/* tcpsrv.c
+ *
+ * Common code for plain TCP syslog based servers. This is currently being
+ * utilized by imtcp and imgssapi.
+ *
+ * NOTE: this is *not* a generic TCP server, but one for syslog servers. For
+ * generic stream servers, please use ./runtime/strmsrv.c!
+ *
+ * There are actually two classes within the tcpserver code: one is
+ * the tcpsrv itself, the other one is its sessions. This is a helper
+ * class to tcpsrv.
+ *
+ * The common code here calls upon specific functionality by using
+ * callbacks. The specialised input modules need to set the proper
+ * callbacks before the code is run. The tcpsrv then calls back
+ * into the specific input modules at the appropriate time.
+ *
+ * NOTE: read comments in module-template.h to understand how this file
+ * works!
+ *
+ * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c[which was
+ * licensed under BSD at the time of the rsyslog fork])
+ *
+ * Copyright 2007-2022 Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdarg.h>
+#include <ctype.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <signal.h>
+#include <sys/socket.h>
+#if HAVE_FCNTL_H
+#include <fcntl.h>
+#endif
+#include "rsyslog.h"
+#include "dirty.h"
+#include "cfsysline.h"
+#include "module-template.h"
+#include "net.h"
+#include "srUtils.h"
+#include "conf.h"
+#include "tcpsrv.h"
+#include "obj.h"
+#include "glbl.h"
+#include "netstrms.h"
+#include "netstrm.h"
+#include "nssel.h"
+#include "nspoll.h"
+#include "errmsg.h"
+#include "ruleset.h"
+#include "ratelimit.h"
+#include "unicode-helper.h"
+
+PRAGMA_INGORE_Wswitch_enum
+MODULE_TYPE_LIB
+MODULE_TYPE_NOKEEP
+
+/* defines */
+#define TCPSESS_MAX_DEFAULT 200 /* default for nbr of tcp sessions if no number is given */
+#define TCPLSTN_MAX_DEFAULT 20 /* default for nbr of listeners */
+
+/* static data */
+DEFobjStaticHelpers
+DEFobjCurrIf(conf)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(ruleset)
+DEFobjCurrIf(tcps_sess)
+DEFobjCurrIf(net)
+DEFobjCurrIf(netstrms)
+DEFobjCurrIf(netstrm)
+DEFobjCurrIf(nssel)
+DEFobjCurrIf(nspoll)
+DEFobjCurrIf(prop)
+DEFobjCurrIf(statsobj)
+
+static void startWorkerPool(void);
+
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ pthread_cond_t run;
+ int idx;
+ tcpsrv_t *pSrv; /* pSrv == NULL -> idle */
+ nspoll_t *pPoll;
+ void *pUsr;
+ sbool enabled;
+ long long unsigned numCalled; /* how often was this called */
+} wrkrInfo[4];
+static sbool bWrkrRunning; /* are the worker threads running? */
+static pthread_mutex_t wrkrMut;
+static pthread_cond_t wrkrIdle;
+static int wrkrMax = 4;
+static int wrkrRunning;
+
+/* add new listener port to listener port list
+ * rgerhards, 2009-05-21
+ */
+static rsRetVal ATTR_NONNULL(1, 2)
+addNewLstnPort(tcpsrv_t *const pThis, tcpLstnParams_t *const cnf_params)
+{
+ tcpLstnPortList_t *pEntry;
+ uchar statname[64];
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ /* create entry */
+ CHKmalloc(pEntry = (tcpLstnPortList_t*)calloc(1, sizeof(tcpLstnPortList_t)));
+ pEntry->cnf_params = cnf_params;
+
+ strcpy((char*)pEntry->cnf_params->dfltTZ, (char*)pThis->dfltTZ);
+ pEntry->cnf_params->bSPFramingFix = pThis->bSPFramingFix;
+ pEntry->cnf_params->bPreserveCase = pThis->bPreserveCase;
+ pEntry->pSrv = pThis;
+
+
+ /* support statistics gathering */
+ CHKiRet(ratelimitNew(&pEntry->ratelimiter, "tcperver", NULL));
+ ratelimitSetLinuxLike(pEntry->ratelimiter, pThis->ratelimitInterval, pThis->ratelimitBurst);
+ ratelimitSetThreadSafe(pEntry->ratelimiter);
+
+ CHKiRet(statsobj.Construct(&(pEntry->stats)));
+ snprintf((char*)statname, sizeof(statname), "%s(%s)", cnf_params->pszInputName, cnf_params->pszPort);
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(pEntry->stats, statname));
+ CHKiRet(statsobj.SetOrigin(pEntry->stats, pThis->pszOrigin));
+ STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit);
+ CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pEntry->ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(pEntry->stats));
+
+ /* all OK - add to list */
+ pEntry->pNext = pThis->pLstnPorts;
+ pThis->pLstnPorts = pEntry;
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(pEntry != NULL) {
+ if(pEntry->cnf_params->pInputName != NULL) {
+ prop.Destruct(&pEntry->cnf_params->pInputName);
+ }
+ if(pEntry->ratelimiter != NULL) {
+ ratelimitDestruct(pEntry->ratelimiter);
+ }
+ if(pEntry->stats != NULL) {
+ statsobj.Destruct(&pEntry->stats);
+ }
+ free(pEntry);
+ }
+ }
+
+ RETiRet;
+}
+
+
+/* configure TCP listener settings.
+ * Note: pszPort is handed over to us - the caller MUST NOT free it!
+ * rgerhards, 2008-03-20
+ */
+static rsRetVal ATTR_NONNULL(1,2)
+configureTCPListen(tcpsrv_t *const pThis, tcpLstnParams_t *const cnf_params)
+{
+ assert(cnf_params->pszPort != NULL);
+ int i;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ /* extract port */
+ const uchar *pPort = cnf_params->pszPort;
+ i = 0;
+ while(isdigit((int) *pPort)) {
+ i = i * 10 + *pPort++ - '0';
+ }
+
+ if(i >= 0 && i <= 65535) {
+ CHKiRet(addNewLstnPort(pThis, cnf_params));
+ } else {
+ LogError(0, NO_ERRCODE, "Invalid TCP listen port %s - ignored.\n", cnf_params->pszPort);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Initialize the session table
+ * returns 0 if OK, somewhat else otherwise
+ */
+static rsRetVal
+TCPSessTblInit(tcpsrv_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ assert(pThis->pSessions == NULL);
+
+ DBGPRINTF("Allocating buffer for %d TCP sessions.\n", pThis->iSessMax);
+ if((pThis->pSessions = (tcps_sess_t **) calloc(pThis->iSessMax, sizeof(tcps_sess_t *))) == NULL) {
+ DBGPRINTF("Error: TCPSessInit() could not alloc memory for TCP session table.\n");
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* find a free spot in the session table. If the table
+ * is full, -1 is returned, else the index of the free
+ * entry (0 or higher).
+ */
+static int
+TCPSessTblFindFreeSpot(tcpsrv_t *pThis)
+{
+ register int i;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ for(i = 0 ; i < pThis->iSessMax ; ++i) {
+ if(pThis->pSessions[i] == NULL)
+ break;
+ }
+
+ return((i < pThis->iSessMax) ? i : -1);
+}
+
+
+/* Get the next session index. Free session tables entries are
+ * skipped. This function is provided the index of the last
+ * session entry, or -1 if no previous entry was obtained. It
+ * returns the index of the next session or -1, if there is no
+ * further entry in the table. Please note that the initial call
+ * might as well return -1, if there is no session at all in the
+ * session table.
+ */
+static int
+TCPSessGetNxtSess(tcpsrv_t *pThis, int iCurr)
+{
+ register int i;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ assert(pThis->pSessions != NULL);
+ for(i = iCurr + 1 ; i < pThis->iSessMax ; ++i) {
+ if(pThis->pSessions[i] != NULL)
+ break;
+ }
+
+ return((i < pThis->iSessMax) ? i : -1);
+}
+
+
+/* De-Initialize TCP listner sockets.
+ * This function deinitializes everything, including freeing the
+ * session table. No TCP listen receive operations are permitted
+ * unless the subsystem is reinitialized.
+ * rgerhards, 2007-06-21
+ */
+static void ATTR_NONNULL()
+deinit_tcp_listener(tcpsrv_t *const pThis)
+{
+ int i;
+ tcpLstnPortList_t *pEntry;
+ tcpLstnPortList_t *pDel;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ if(pThis->pSessions != NULL) {
+ /* close all TCP connections! */
+ if(!pThis->bUsingEPoll) {
+ i = TCPSessGetNxtSess(pThis, -1);
+ while(i != -1) {
+ tcps_sess.Destruct(&pThis->pSessions[i]);
+ /* now get next... */
+ i = TCPSessGetNxtSess(pThis, i);
+ }
+ }
+
+ /* we are done with the session table - so get rid of it... */
+ free(pThis->pSessions);
+ pThis->pSessions = NULL; /* just to make sure... */
+ }
+
+ /* free list of tcp listen ports */
+ pEntry = pThis->pLstnPorts;
+ while(pEntry != NULL) {
+ prop.Destruct(&pEntry->cnf_params->pInputName);
+ free((void*)pEntry->cnf_params->pszInputName);
+ free((void*)pEntry->cnf_params->pszPort);
+ free((void*)pEntry->cnf_params->pszAddr);
+ free((void*)pEntry->cnf_params->pszLstnPortFileName);
+ free((void*)pEntry->cnf_params);
+ ratelimitDestruct(pEntry->ratelimiter);
+ statsobj.Destruct(&(pEntry->stats));
+ pDel = pEntry;
+ pEntry = pEntry->pNext;
+ free(pDel);
+ }
+
+ /* finally close our listen streams */
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ netstrm.Destruct(pThis->ppLstn + i);
+ }
+}
+
+
+/* add a listen socket to our listen socket array. This is a callback
+ * invoked from the netstrm class. -- rgerhards, 2008-04-23
+ */
+static rsRetVal
+addTcpLstn(void *pUsr, netstrm_t *pLstn)
+{
+ tcpLstnPortList_t *pPortList = (tcpLstnPortList_t *) pUsr;
+ tcpsrv_t *pThis = pPortList->pSrv;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ ISOBJ_TYPE_assert(pLstn, netstrm);
+
+ if(pThis->iLstnCurr >= pThis->iLstnMax)
+ ABORT_FINALIZE(RS_RET_MAX_LSTN_REACHED);
+
+ pThis->ppLstn[pThis->iLstnCurr] = pLstn;
+ pThis->ppLstnPort[pThis->iLstnCurr] = pPortList;
+ ++pThis->iLstnCurr;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Initialize TCP listener socket for a single port
+ * Note: at this point, TLS vs. non-TLS does not matter; TLS params are
+ * set on connect!
+ * rgerhards, 2009-05-21
+ */
+static rsRetVal
+initTCPListener(tcpsrv_t *pThis, tcpLstnPortList_t *pPortEntry)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ assert(pPortEntry != NULL);
+
+ // pPortEntry->pszAddr = NULL ==> bind to all interfaces
+ CHKiRet(netstrm.LstnInit(pThis->pNS, (void*)pPortEntry, addTcpLstn,
+ pThis->iSessMax, pPortEntry->cnf_params));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Initialize TCP sockets (for listener) and listens on them */
+static rsRetVal
+create_tcp_socket(tcpsrv_t *pThis)
+{
+ DEFiRet;
+ rsRetVal localRet;
+ tcpLstnPortList_t *pEntry;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ /* init all configured ports */
+ pEntry = pThis->pLstnPorts;
+ while(pEntry != NULL) {
+ localRet = initTCPListener(pThis, pEntry);
+ if(localRet != RS_RET_OK) {
+ LogError(0, localRet, "Could not create tcp listener, ignoring port "
+ "%s bind-address %s.",
+ (pEntry->cnf_params->pszPort == NULL) ? "**UNSPECIFIED**"
+ : (const char*) pEntry->cnf_params->pszPort,
+ (pEntry->cnf_params->pszAddr == NULL) ? "**UNSPECIFIED**"
+ : (const char*)pEntry->cnf_params->pszAddr);
+ }
+ pEntry = pEntry->pNext;
+ }
+
+ /* OK, we had success. Now it is also time to
+ * initialize our connections
+ */
+ if(TCPSessTblInit(pThis) != 0) {
+ /* OK, we are in some trouble - we could not initialize the
+ * session table, so we can not continue. We need to free all
+ * we have assigned so far, because we can not really use it...
+ */
+ LogError(0, RS_RET_ERR, "Could not initialize TCP session table, suspending TCP "
+ "message reception.");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Accept new TCP connection; make entry in session table. If there
+ * is no more space left in the connection table, the new TCP
+ * connection is immediately dropped.
+ * ppSess has a pointer to the newly created session, if it succeeds.
+ * If it does not succeed, no session is created and ppSess is
+ * undefined. If the user has provided an OnSessAccept Callback,
+ * this one is executed immediately after creation of the
+ * session object, so that it can do its own initialization.
+ * rgerhards, 2008-03-02
+ */
+static rsRetVal
+SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, netstrm_t *pStrm)
+{
+ DEFiRet;
+ tcps_sess_t *pSess = NULL;
+ netstrm_t *pNewStrm = NULL;
+ const tcpLstnParams_t *const cnf_params = pLstnInfo->cnf_params;
+ int iSess = -1;
+ struct sockaddr_storage *addr;
+ uchar *fromHostFQDN = NULL;
+ prop_t *fromHostIP = NULL;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ assert(pLstnInfo != NULL);
+
+ CHKiRet(netstrm.AcceptConnReq(pStrm, &pNewStrm));
+
+ /* Add to session list */
+ iSess = TCPSessTblFindFreeSpot(pThis);
+ if(iSess == -1) {
+ errno = 0;
+ LogError(0, RS_RET_MAX_SESS_REACHED, "too many tcp sessions - dropping incoming request");
+ ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
+ }
+
+ if(pThis->bUseKeepAlive) {
+ CHKiRet(netstrm.SetKeepAliveProbes(pNewStrm, pThis->iKeepAliveProbes));
+ CHKiRet(netstrm.SetKeepAliveTime(pNewStrm, pThis->iKeepAliveTime));
+ CHKiRet(netstrm.SetKeepAliveIntvl(pNewStrm, pThis->iKeepAliveIntvl));
+ CHKiRet(netstrm.EnableKeepAlive(pNewStrm));
+ }
+
+ /* we found a free spot and can construct our session object */
+ if(pThis->gnutlsPriorityString != NULL) {
+ CHKiRet(netstrm.SetGnutlsPriorityString(pNewStrm, pThis->gnutlsPriorityString));
+ }
+ CHKiRet(tcps_sess.Construct(&pSess));
+ CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis));
+ CHKiRet(tcps_sess.SetLstnInfo(pSess, pLstnInfo));
+ if(pThis->OnMsgReceive != NULL)
+ CHKiRet(tcps_sess.SetOnMsgReceive(pSess, pThis->OnMsgReceive));
+
+ /* get the host name */
+ CHKiRet(netstrm.GetRemoteHName(pNewStrm, &fromHostFQDN));
+ if (!cnf_params->bPreserveCase) {
+ /* preserve_case = off */
+ uchar *p;
+ for(p = fromHostFQDN; *p; p++) {
+ if (isupper((int) *p)) {
+ *p = tolower((int) *p);
+ }
+ }
+ }
+ CHKiRet(netstrm.GetRemoteIP(pNewStrm, &fromHostIP));
+ CHKiRet(netstrm.GetRemAddr(pNewStrm, &addr));
+ /* TODO: check if we need to strip the domain name here -- rgerhards, 2008-04-24 */
+
+ /* Here we check if a host is permitted to send us messages. If it isn't, we do not further
+ * process the message but log a warning (if we are configured to do this).
+ * rgerhards, 2005-09-26
+ */
+ if(!pThis->pIsPermittedHost((struct sockaddr*) addr, (char*) fromHostFQDN, pThis->pUsr, pSess->pUsr)) {
+ DBGPRINTF("%s is not an allowed sender\n", fromHostFQDN);
+ if(glbl.GetOptionDisallowWarning(runConf)) {
+ errno = 0;
+ LogError(0, RS_RET_HOST_NOT_PERMITTED, "connection request from disallowed "
+ "sender %s discarded", fromHostFQDN);
+ }
+ ABORT_FINALIZE(RS_RET_HOST_NOT_PERMITTED);
+ }
+
+ /* OK, we have an allowed sender, so let's continue, what
+ * means we can finally fill in the session object.
+ */
+ CHKiRet(tcps_sess.SetHost(pSess, fromHostFQDN));
+ fromHostFQDN = NULL; /* we handed this string over */
+ CHKiRet(tcps_sess.SetHostIP(pSess, fromHostIP));
+ CHKiRet(tcps_sess.SetStrm(pSess, pNewStrm));
+ pNewStrm = NULL; /* prevent it from being freed in error handler, now done in tcps_sess! */
+ CHKiRet(tcps_sess.SetMsgIdx(pSess, 0));
+ CHKiRet(tcps_sess.ConstructFinalize(pSess));
+
+ /* check if we need to call our callback */
+ if(pThis->pOnSessAccept != NULL) {
+ CHKiRet(pThis->pOnSessAccept(pThis, pSess));
+ }
+
+ *ppSess = pSess;
+ if(!pThis->bUsingEPoll)
+ pThis->pSessions[iSess] = pSess;
+ pSess = NULL; /* this is now also handed over */
+
+ if(pThis->bEmitMsgOnOpen) {
+ LogMsg(0, RS_RET_NO_ERRCODE, LOG_INFO,
+ "imtcp: connection established with host: %s",
+ propGetSzStr(fromHostIP));
+ }
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(iRet != RS_RET_HOST_NOT_PERMITTED && pThis->bEmitMsgOnOpen) {
+ LogError(0, NO_ERRCODE, "imtcp: connection could not be "
+ "established with host: %s",
+ fromHostIP == NULL ? "(IP unknown)"
+ : (const char*)propGetSzStr(fromHostIP));
+ }
+ if(pSess != NULL)
+ tcps_sess.Destruct(&pSess);
+ if(pNewStrm != NULL)
+ netstrm.Destruct(&pNewStrm);
+ free(fromHostFQDN);
+ }
+
+ RETiRet;
+}
+
+
+static void
+RunCancelCleanup(void *arg)
+{
+ nspoll_t **ppPoll = (nspoll_t**) arg;
+
+ if (*ppPoll != NULL)
+ nspoll.Destruct(ppPoll);
+
+ /* Wait for any running workers to finish */
+ pthread_mutex_lock(&wrkrMut);
+ DBGPRINTF("tcpsrv terminating, waiting for %d workers\n", wrkrRunning);
+ while(wrkrRunning > 0) {
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+}
+
+static void
+RunSelectCancelCleanup(void *arg)
+{
+ nssel_t **ppSel = (nssel_t**) arg;
+
+ if(*ppSel != NULL)
+ nssel.Destruct(ppSel);
+}
+
+
+/* helper to close a session. Takes status of poll vs. select into consideration.
+ * rgerhards, 2009-11-25
+ */
+static rsRetVal
+closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) {
+ DEFiRet;
+ if(pPoll != NULL) {
+ CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL));
+ }
+ pThis->pOnRegularClose(*ppSess);
+ tcps_sess.Destruct(ppSess);
+finalize_it:
+ RETiRet;
+}
+
+
+/* process a receive request on one of the streams
+ * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need
+ * to remove any descriptor we close from the epoll set.
+ * rgerhards, 2009-07-020
+ */
+static rsRetVal
+doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
+{
+ char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
+ ssize_t iRcvd;
+ rsRetVal localRet;
+ DEFiRet;
+ uchar *pszPeer;
+ int lenPeer;
+ int oserr = 0;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ prop.GetString((*ppSess)->fromHostIP, &pszPeer, &lenPeer);
+ DBGPRINTF("netstream %p with new data from remote peer %s\n", (*ppSess)->pStrm, pszPeer);
+ /* Receive message */
+ iRet = pThis->pRcvData(*ppSess, buf, sizeof(buf), &iRcvd, &oserr);
+ switch(iRet) {
+ case RS_RET_CLOSED:
+ if(pThis->bEmitMsgOnClose) {
+ errno = 0;
+ LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote "
+ "peer %s.\n", (*ppSess)->pStrm, pszPeer);
+ }
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
+ break;
+ case RS_RET_RETRY:
+ /* we simply ignore retry - this is not an error, but we also have not received anything */
+ break;
+ case RS_RET_OK:
+ /* valid data received, process it! */
+ localRet = tcps_sess.DataRcvd(*ppSess, buf, iRcvd);
+ if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) {
+ /* in this case, something went awfully wrong.
+ * We are instructed to terminate the session.
+ */
+ LogError(oserr, localRet, "Tearing down TCP Session from %s", pszPeer);
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
+ }
+ break;
+ default:
+ LogError(oserr, iRet, "netstream session %p from %s will be closed due to error",
+ (*ppSess)->pStrm, pszPeer);
+ CHKiRet(closeSess(pThis, ppSess, pPoll));
+ break;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+/* process a single workset item
+ */
+static rsRetVal ATTR_NONNULL(1)
+processWorksetItem(tcpsrv_t *const pThis, nspoll_t *pPoll, const int idx, void *pUsr)
+{
+ tcps_sess_t *pNewSess = NULL;
+ tcpLstnParams_t *cnf_params;
+
+ DEFiRet;
+
+ DBGPRINTF("tcpsrv: processing item %d, pUsr %p, bAbortConn\n", idx, pUsr);
+ if(pUsr == pThis->ppLstn) {
+ DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
+ iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
+ cnf_params = pThis->ppLstnPort[idx]->cnf_params;
+ if(iRet == RS_RET_OK) {
+ if(pPoll != NULL) {
+ CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ }
+ DBGPRINTF("New session created with NSD %p.\n", pNewSess);
+ } else {
+ DBGPRINTF("tcpsrv: error %d during accept\n", iRet);
+ }
+ } else {
+ pNewSess = (tcps_sess_t*) pUsr;
+ cnf_params = pNewSess->pLstnInfo->cnf_params;
+ doReceive(pThis, &pNewSess, pPoll);
+ if(pPoll == NULL && pNewSess == NULL) {
+ pThis->pSessions[idx] = NULL;
+ }
+ }
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ LogError(0, iRet, "tcpsrv listener (inputname: '%s') failed "
+ "to process incoming connection with error %d",
+ (cnf_params->pszInputName == NULL) ? (uchar*)"*UNSET*" : cnf_params->pszInputName, iRet);
+ srSleep(0,20000); /* Sleep 20ms */
+ }
+ RETiRet;
+}
+
+
+/* worker to process incoming requests
+ */
+static void * ATTR_NONNULL(1)
+wrkr(void *const myself)
+{
+ struct wrkrInfo_s *const me = (struct wrkrInfo_s*) myself;
+
+
+ pthread_mutex_lock(&wrkrMut);
+ while(1) {
+ // wait for work, in which case pSrv will be populated
+ while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) {
+ pthread_cond_wait(&me->run, &wrkrMut);
+ }
+ if(me->pSrv == NULL) {
+ // only possible if glbl.GetGlobalInputTermState() == 1
+ // we need to query me->opSrv to avoid clang static
+ // analyzer false positive! -- rgerhards, 2017-10-23
+ assert(glbl.GetGlobalInputTermState() == 1);
+ break;
+ }
+ pthread_mutex_unlock(&wrkrMut);
+
+ ++me->numCalled;
+ processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr);
+
+ pthread_mutex_lock(&wrkrMut);
+ me->pSrv = NULL; /* indicate we are free again */
+ --wrkrRunning;
+ pthread_cond_broadcast(&wrkrIdle);
+ }
+ me->enabled = 0; /* indicate we are no longer available */
+ pthread_mutex_unlock(&wrkrMut);
+
+ return NULL;
+}
+
+/* This has been factored out from processWorkset() because
+ * pthread_cleanup_push() invokes setjmp() and this triggers the -Wclobbered
+ * warning for the iRet variable.
+ */
+static void
+waitForWorkers(void)
+{
+ pthread_mutex_lock(&wrkrMut);
+ pthread_cleanup_push(mutexCancelCleanup, &wrkrMut);
+ while(wrkrRunning > 0) {
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+ }
+ pthread_cleanup_pop(1);
+}
+
+/* Process a workset, that is handle io. We become activated
+ * from either select or epoll handler. We split the workload
+ * out to a pool of threads, but try to avoid context switches
+ * as much as possible.
+ */
+static rsRetVal
+processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
+{
+ int i;
+ int origEntries = numEntries;
+ DEFiRet;
+
+ DBGPRINTF("tcpsrv: ready to process %d event entries\n", numEntries);
+
+ while(numEntries > 0) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ if(numEntries == 1) {
+ /* process self, save context switch */
+ iRet = processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr);
+ } else {
+ /* No cancel handler needed here, since no cancellation
+ * points are executed while wrkrMut is locked.
+ *
+ * Re-evaluate this if you add a DBGPRINTF or something!
+ */
+ pthread_mutex_lock(&wrkrMut);
+ /* check if there is a free worker */
+ for(i = 0 ; (i < wrkrMax) && ((wrkrInfo[i].pSrv != NULL) || (wrkrInfo[i].enabled == 0)) ; ++i)
+ /*do search*/;
+ if(i < wrkrMax) {
+ /* worker free -> use it! */
+ wrkrInfo[i].pSrv = pThis;
+ wrkrInfo[i].pPoll = pPoll;
+ wrkrInfo[i].idx = workset[numEntries -1].id;
+ wrkrInfo[i].pUsr = workset[numEntries -1].pUsr;
+ /* Note: we must increment wrkrRunning HERE and not inside the worker's
+ * code. This is because a worker may actually never start, and thus
+ * increment wrkrRunning, before we finish and check the running worker
+ * count. We can only avoid this by incrementing it here.
+ */
+ ++wrkrRunning;
+ pthread_cond_signal(&wrkrInfo[i].run);
+ pthread_mutex_unlock(&wrkrMut);
+ } else {
+ pthread_mutex_unlock(&wrkrMut);
+ /* no free worker, so we process this one ourselfs */
+ iRet = processWorksetItem(pThis, pPoll, workset[numEntries-1].id,
+ workset[numEntries-1].pUsr);
+ }
+ }
+ --numEntries;
+ }
+
+ if(origEntries > 1) {
+ /* we now need to wait until all workers finish. This is because the
+ * rest of this module can not handle the concurrency introduced
+ * by workers running during the epoll call.
+ */
+ waitForWorkers();
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function is called to gather input.
+ * This variant here is only used if we need to work with a netstream driver
+ * that does not support epoll().
+ */
+PRAGMA_DIAGNOSTIC_PUSH
+PRAGMA_IGNORE_Wempty_body
+static rsRetVal
+RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset)
+{
+ DEFiRet;
+ int nfds;
+ int i;
+ int iWorkset;
+ int iTCPSess;
+ int bIsReady;
+ nssel_t *pSel = NULL;
+ rsRetVal localRet;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ pthread_cleanup_push(RunSelectCancelCleanup, (void*) &pSel);
+ while(1) {
+ CHKiRet(nssel.Construct(&pSel));
+ if(pThis->pszDrvrName != NULL)
+ CHKiRet(nssel.SetDrvrName(pSel, pThis->pszDrvrName));
+ CHKiRet(nssel.ConstructFinalize(pSel));
+
+ /* Add the TCP listen sockets to the list of read descriptors. */
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ CHKiRet(nssel.Add(pSel, pThis->ppLstn[i], NSDSEL_RD));
+ }
+
+ /* do the sessions */
+ iTCPSess = TCPSessGetNxtSess(pThis, -1);
+ while(iTCPSess != -1) {
+ /* TODO: access to pNsd is NOT really CLEAN, use method... */
+ CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD));
+ DBGPRINTF("tcpsrv process session %d:\n", iTCPSess);
+
+ /* now get next... */
+ iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
+ }
+
+ /* wait for io to become ready */
+ CHKiRet(nssel.Wait(pSel, &nfds));
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
+
+ iWorkset = 0;
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
+ if(bIsReady) {
+ workset[iWorkset].id = i;
+ workset[iWorkset].pUsr = (void*) pThis->ppLstn;
+ /* this is a flag to indicate listen sock */
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
+ --nfds; /* indicate we have processed one */
+ }
+ }
+
+ /* now check the sessions */
+ iTCPSess = TCPSessGetNxtSess(pThis, -1);
+ while(nfds && iTCPSess != -1) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ localRet = nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD,
+ &bIsReady, &nfds);
+ if(bIsReady || localRet != RS_RET_OK) {
+ workset[iWorkset].id = iTCPSess;
+ workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess];
+ ++iWorkset;
+ if(iWorkset >= (int) sizeWorkset) {
+ processWorkset(pThis, NULL, iWorkset, workset);
+ iWorkset = 0;
+ }
+ if(bIsReady)
+ --nfds; /* indicate we have processed one */
+ }
+ iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
+ }
+
+ if(iWorkset > 0)
+ processWorkset(pThis, NULL, iWorkset, workset);
+
+ /* we need to copy back close descriptors */
+ nssel.Destruct(&pSel); /* no iRet check as it is overriden at start of loop! */
+finalize_it: /* this is a very special case - this time only we do not exit the function,
+ * because that would not help us either. So we simply retry it. Let's see
+ * if that actually is a better idea. Exiting the loop wasn't we always
+ * crashed, which made sense (the rest of the engine was not prepared for
+ * that) -- rgerhards, 2008-05-19
+ */
+ if(pSel != NULL) { /* cleanup missing? happens during err exit! */
+ nssel.Destruct(&pSel);
+ }
+ }
+
+ pthread_cleanup_pop(1); /* execute and remove cleanup handler */
+
+ RETiRet;
+}
+PRAGMA_DIAGNOSTIC_POP
+
+static rsRetVal
+DoRun(tcpsrv_t *pThis, nspoll_t **ppPoll)
+{
+ DEFiRet;
+ int i;
+ nsd_epworkset_t workset[128]; /* 128 is currently fixed num of concurrent requests */
+ int numEntries;
+ nspoll_t *pPoll = NULL;
+ rsRetVal localRet;
+
+ if((localRet = nspoll.Construct(ppPoll)) == RS_RET_OK) {
+ pPoll = *ppPoll;
+ if(pThis->pszDrvrName != NULL)
+ CHKiRet(nspoll.SetDrvrName(pPoll, pThis->pszDrvrName));
+ localRet = nspoll.ConstructFinalize(pPoll);
+ }
+ if(localRet != RS_RET_OK) {
+ /* fall back to select */
+ DBGPRINTF("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet);
+ iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t));
+ FINALIZE;
+ }
+
+ DBGPRINTF("tcpsrv uses epoll() interface, nsdpoll driver found\n");
+
+ /* flag that we are in epoll mode */
+ pThis->bUsingEPoll = RSTRUE;
+
+ /* Add the TCP listen sockets to the list of sockets to monitor */
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ DBGPRINTF("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn);
+ CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD));
+ DBGPRINTF("Added listener %d\n", i);
+ }
+
+ while(glbl.GetGlobalInputTermState() == 0) {
+ numEntries = sizeof(workset)/sizeof(nsd_epworkset_t);
+ localRet = nspoll.Wait(pPoll, -1, &numEntries, workset);
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
+
+ /* check if we need to ignore the i/o ready state. We do this if we got an invalid
+ * return state. Validly, this can happen for RS_RET_EINTR, for other cases it may
+ * not be the right thing, but what is the right thing is really hard at this point...
+ */
+ if(localRet != RS_RET_OK)
+ continue;
+
+ processWorkset(pThis, pPoll, numEntries, workset);
+ }
+
+ /* remove the tcp listen sockets from the epoll set */
+ for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
+ CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* This function is called to gather input. It tries doing that via the epoll()
+ * interface. If the driver does not support that, it falls back to calling its
+ * select() equivalent.
+ * rgerhards, 2009-11-18
+ */
+static rsRetVal
+Run(tcpsrv_t *pThis)
+{
+ DEFiRet;
+ nspoll_t *pPoll = NULL;
+
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ if(pThis->iLstnCurr == 0) {
+ dbgprintf("tcpsrv: no listeneres at all (probably init error), terminating\n");
+ RETiRet; /* somewhat "dirty" exit to avoid issue with cancel handler */
+ }
+
+ /* check if we need to start the worker pool. Once it is running, all is
+ * well. Shutdown is done on modExit.
+ */
+ d_pthread_mutex_lock(&wrkrMut);
+ if(!bWrkrRunning) {
+ bWrkrRunning = 1;
+ startWorkerPool();
+ }
+ d_pthread_mutex_unlock(&wrkrMut);
+
+ /* We try to terminate cleanly, but install a cancellation clean-up
+ * handler in case we are cancelled.
+ */
+ pthread_cleanup_push(RunCancelCleanup, (void*) &pPoll);
+ iRet = DoRun(pThis, &pPoll);
+ pthread_cleanup_pop(1);
+
+ RETiRet;
+}
+
+
+/* Standard-Constructor */
+BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */
+ pThis->iSessMax = TCPSESS_MAX_DEFAULT;
+ pThis->iLstnMax = TCPLSTN_MAX_DEFAULT;
+ pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
+ pThis->maxFrameSize = 200000;
+ pThis->bDisableLFDelim = 0;
+ pThis->discardTruncatedMsg = 0;
+ pThis->OnMsgReceive = NULL;
+ pThis->dfltTZ[0] = '\0';
+ pThis->bSPFramingFix = 0;
+ pThis->ratelimitInterval = 0;
+ pThis->ratelimitBurst = 10000;
+ pThis->bUseFlowControl = 1;
+ pThis->pszDrvrName = NULL;
+ pThis->bPreserveCase = 1; /* preserve case in fromhost; default to true. */
+ pThis->DrvrTlsVerifyDepth = 0;
+ENDobjConstruct(tcpsrv)
+
+
+/* ConstructionFinalizer */
+static rsRetVal
+tcpsrvConstructFinalize(tcpsrv_t *pThis)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+
+ /* prepare network stream subsystem */
+ CHKiRet(netstrms.Construct(&pThis->pNS));
+ if(pThis->pszDrvrName != NULL)
+ CHKiRet(netstrms.SetDrvrName(pThis->pNS, pThis->pszDrvrName));
+ CHKiRet(netstrms.SetDrvrMode(pThis->pNS, pThis->iDrvrMode));
+ CHKiRet(netstrms.SetDrvrCheckExtendedKeyUsage(pThis->pNS, pThis->DrvrChkExtendedKeyUsage));
+ CHKiRet(netstrms.SetDrvrPrioritizeSAN(pThis->pNS, pThis->DrvrPrioritizeSan));
+ CHKiRet(netstrms.SetDrvrTlsVerifyDepth(pThis->pNS, pThis->DrvrTlsVerifyDepth));
+ if(pThis->pszDrvrAuthMode != NULL)
+ CHKiRet(netstrms.SetDrvrAuthMode(pThis->pNS, pThis->pszDrvrAuthMode));
+ /* Call SetDrvrPermitExpiredCerts required
+ * when param is NULL default handling for ExpiredCerts is set! */
+ CHKiRet(netstrms.SetDrvrPermitExpiredCerts(pThis->pNS, pThis->pszDrvrPermitExpiredCerts));
+ CHKiRet(netstrms.SetDrvrTlsCAFile(pThis->pNS, pThis->pszDrvrCAFile));
+ CHKiRet(netstrms.SetDrvrTlsCRLFile(pThis->pNS, pThis->pszDrvrCRLFile));
+ CHKiRet(netstrms.SetDrvrTlsKeyFile(pThis->pNS, pThis->pszDrvrKeyFile));
+ CHKiRet(netstrms.SetDrvrTlsCertFile(pThis->pNS, pThis->pszDrvrCertFile));
+ if(pThis->pPermPeers != NULL)
+ CHKiRet(netstrms.SetDrvrPermPeers(pThis->pNS, pThis->pPermPeers));
+ if(pThis->gnutlsPriorityString != NULL)
+ CHKiRet(netstrms.SetDrvrGnutlsPriorityString(pThis->pNS, pThis->gnutlsPriorityString));
+ CHKiRet(netstrms.ConstructFinalize(pThis->pNS));
+
+ /* set up listeners */
+ CHKmalloc(pThis->ppLstn = calloc(pThis->iLstnMax, sizeof(netstrm_t*)));
+ CHKmalloc(pThis->ppLstnPort = calloc(pThis->iLstnMax, sizeof(tcpLstnPortList_t*)));
+ iRet = pThis->OpenLstnSocks(pThis);
+
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(pThis->pNS != NULL)
+ netstrms.Destruct(&pThis->pNS);
+ LogError(0, iRet, "tcpsrv could not create listener (inputname: '%s')",
+ (pThis->pszInputName == NULL) ? (uchar*)"*UNSET*" : pThis->pszInputName);
+ }
+ RETiRet;
+}
+
+
+/* destructor for the tcpsrv object */
+BEGINobjDestruct(tcpsrv) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(tcpsrv)
+ if(pThis->OnDestruct != NULL)
+ pThis->OnDestruct(pThis->pUsr);
+
+ deinit_tcp_listener(pThis);
+
+ if(pThis->pNS != NULL)
+ netstrms.Destruct(&pThis->pNS);
+ free(pThis->pszDrvrName);
+ free(pThis->pszDrvrAuthMode);
+ free(pThis->pszDrvrPermitExpiredCerts);
+ free(pThis->pszDrvrCAFile);
+ free(pThis->pszDrvrCRLFile);
+ free(pThis->pszDrvrKeyFile);
+ free(pThis->pszDrvrCertFile);
+ free(pThis->ppLstn);
+ free(pThis->ppLstnPort);
+ free(pThis->pszInputName);
+ free(pThis->pszOrigin);
+ENDobjDestruct(tcpsrv)
+
+
+/* debugprint for the tcpsrv object */
+BEGINobjDebugPrint(tcpsrv) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDebugPrint(tcpsrv)
+ENDobjDebugPrint(tcpsrv)
+
+/* set functions */
+static rsRetVal
+SetCBIsPermittedHost(tcpsrv_t *pThis, int (*pCB)(struct sockaddr *addr, char *fromHostFQDN, void*, void*))
+{
+ DEFiRet;
+ pThis->pIsPermittedHost = pCB;
+ RETiRet;
+}
+
+static rsRetVal
+SetCBRcvData(tcpsrv_t *pThis, rsRetVal (*pRcvData)(tcps_sess_t*, char*, size_t, ssize_t*, int*))
+{
+ DEFiRet;
+ pThis->pRcvData = pRcvData;
+ RETiRet;
+}
+
+static rsRetVal
+SetCBOnListenDeinit(tcpsrv_t *pThis, rsRetVal (*pCB)(void*))
+{
+ DEFiRet;
+ pThis->pOnListenDeinit = pCB;
+ RETiRet;
+}
+
+static rsRetVal
+SetCBOnSessAccept(tcpsrv_t *pThis, rsRetVal (*pCB)(tcpsrv_t*, tcps_sess_t*))
+{
+ DEFiRet;
+ pThis->pOnSessAccept = pCB;
+ RETiRet;
+}
+
+static rsRetVal
+SetCBOnDestruct(tcpsrv_t *pThis, rsRetVal (*pCB)(void*))
+{
+ DEFiRet;
+ pThis->OnDestruct = pCB;
+ RETiRet;
+}
+
+static rsRetVal
+SetCBOnSessConstructFinalize(tcpsrv_t *pThis, rsRetVal (*pCB)(void*))
+{
+ DEFiRet;
+ pThis->OnSessConstructFinalize = pCB;
+ RETiRet;
+}
+
+static rsRetVal
+SetCBOnSessDestruct(tcpsrv_t *pThis, rsRetVal (*pCB)(void*))
+{
+ DEFiRet;
+ pThis->pOnSessDestruct = pCB;
+ RETiRet;
+}
+
+static rsRetVal
+SetCBOnRegularClose(tcpsrv_t *pThis, rsRetVal (*pCB)(tcps_sess_t*))
+{
+ DEFiRet;
+ pThis->pOnRegularClose = pCB;
+ RETiRet;
+}
+
+static rsRetVal
+SetCBOnErrClose(tcpsrv_t *pThis, rsRetVal (*pCB)(tcps_sess_t*))
+{
+ DEFiRet;
+ pThis->pOnErrClose = pCB;
+ RETiRet;
+}
+
+static rsRetVal
+SetCBOpenLstnSocks(tcpsrv_t *pThis, rsRetVal (*pCB)(tcpsrv_t*))
+{
+ DEFiRet;
+ pThis->OpenLstnSocks = pCB;
+ RETiRet;
+}
+
+static rsRetVal
+SetUsrP(tcpsrv_t *pThis, void *pUsr)
+{
+ DEFiRet;
+ pThis->pUsr = pUsr;
+ RETiRet;
+}
+
+static rsRetVal
+SetKeepAlive(tcpsrv_t *pThis, int iVal)
+{
+ DEFiRet;
+ DBGPRINTF("tcpsrv: keep-alive set to %d\n", iVal);
+ pThis->bUseKeepAlive = iVal;
+ RETiRet;
+}
+
+static rsRetVal
+SetKeepAliveIntvl(tcpsrv_t *pThis, int iVal)
+{
+ DEFiRet;
+ DBGPRINTF("tcpsrv: keep-alive interval set to %d\n", iVal);
+ pThis->iKeepAliveIntvl = iVal;
+ RETiRet;
+}
+
+static rsRetVal
+SetKeepAliveProbes(tcpsrv_t *pThis, int iVal)
+{
+ DEFiRet;
+ DBGPRINTF("tcpsrv: keep-alive probes set to %d\n", iVal);
+ pThis->iKeepAliveProbes = iVal;
+ RETiRet;
+}
+
+static rsRetVal
+SetKeepAliveTime(tcpsrv_t *pThis, int iVal)
+{
+ DEFiRet;
+ DBGPRINTF("tcpsrv: keep-alive timeout set to %d\n", iVal);
+ pThis->iKeepAliveTime = iVal;
+ RETiRet;
+}
+
+static rsRetVal
+SetGnutlsPriorityString(tcpsrv_t *pThis, uchar *iVal)
+{
+ DEFiRet;
+ DBGPRINTF("tcpsrv: gnutlsPriorityString set to %s\n",
+ (iVal == NULL) ? "(null)" : (const char*) iVal);
+ pThis->gnutlsPriorityString = iVal;
+ RETiRet;
+}
+
+
+static rsRetVal
+SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int))
+{
+ DEFiRet;
+ assert(OnMsgReceive != NULL);
+ pThis->OnMsgReceive = OnMsgReceive;
+ RETiRet;
+}
+
+
+/* set enable/disable standard LF frame delimiter (use with care!)
+ * -- rgerhards, 2010-01-03
+ */
+static rsRetVal
+SetbDisableLFDelim(tcpsrv_t *pThis, int bVal)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->bDisableLFDelim = bVal;
+ RETiRet;
+}
+
+
+/* discard the truncated msg part
+ * -- PascalWithopf, 2017-04-20
+ */
+static rsRetVal
+SetDiscardTruncatedMsg(tcpsrv_t *pThis, int discard)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->discardTruncatedMsg = discard;
+ RETiRet;
+}
+
+
+/* Set additional framing to use (if any) -- rgerhards, 2008-12-10 */
+static rsRetVal
+SetAddtlFrameDelim(tcpsrv_t *pThis, int iDelim)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->addtlFrameDelim = iDelim;
+ RETiRet;
+}
+
+
+/* Set max frame size for octet counted -- PascalWithopf, 2017-04-20*/
+static rsRetVal
+SetMaxFrameSize(tcpsrv_t *pThis, int maxFrameSize)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->maxFrameSize = maxFrameSize;
+ RETiRet;
+}
+
+
+static rsRetVal
+SetDfltTZ(tcpsrv_t *const pThis, uchar *const tz)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ strncpy((char*)pThis->dfltTZ, (char*)tz, sizeof(pThis->dfltTZ));
+ pThis->dfltTZ[sizeof(pThis->dfltTZ)-1] = '\0';
+ RETiRet;
+}
+
+
+static rsRetVal
+SetbSPFramingFix(tcpsrv_t *pThis, const sbool val)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->bSPFramingFix = val;
+ RETiRet;
+}
+
+static rsRetVal
+SetOrigin(tcpsrv_t *pThis, uchar *origin)
+{
+ DEFiRet;
+ free(pThis->pszOrigin);
+ pThis->pszOrigin = (origin == NULL) ? NULL : ustrdup(origin);
+ RETiRet;
+}
+
+/* Set the input name to use -- rgerhards, 2008-12-10 */
+static rsRetVal
+SetInputName(tcpsrv_t *const pThis,tcpLstnParams_t *const cnf_params, const uchar *const name)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ if(name == NULL)
+ cnf_params->pszInputName = NULL;
+ else
+ CHKmalloc(cnf_params->pszInputName = ustrdup(name));
+ free(pThis->pszInputName); // TODO: REMOVE ME
+ pThis->pszInputName = ustrdup("imtcp"); // TODO: REMOVE ME
+
+ /* we need to create a property */
+ CHKiRet(prop.Construct(&cnf_params->pInputName));
+ CHKiRet(prop.SetString(cnf_params->pInputName, cnf_params->pszInputName, ustrlen(cnf_params->pszInputName)));
+ CHKiRet(prop.ConstructFinalize(cnf_params->pInputName));
+finalize_it:
+ RETiRet;
+}
+
+
+/* Set the linux-like ratelimiter settings */
+static rsRetVal
+SetLinuxLikeRatelimiters(tcpsrv_t *pThis, unsigned int ratelimitInterval, unsigned int ratelimitBurst)
+{
+ DEFiRet;
+ pThis->ratelimitInterval = ratelimitInterval;
+ pThis->ratelimitBurst = ratelimitBurst;
+ RETiRet;
+}
+
+
+/* Set connection open notification */
+static rsRetVal
+SetNotificationOnRemoteOpen(tcpsrv_t *pThis, const int bNewVal)
+{
+ pThis->bEmitMsgOnOpen = bNewVal;
+ return RS_RET_OK;
+}
+/* Set connection close notification */
+static rsRetVal
+SetNotificationOnRemoteClose(tcpsrv_t *pThis, const int bNewVal)
+{
+ DEFiRet;
+ pThis->bEmitMsgOnClose = bNewVal;
+ RETiRet;
+}
+
+
+/* here follows a number of methods that shuffle authentication settings down
+ * to the drivers. Drivers not supporting these settings may return an error
+ * state.
+ * -------------------------------------------------------------------------- */
+
+/* set the driver mode -- rgerhards, 2008-04-30 */
+static rsRetVal
+SetDrvrMode(tcpsrv_t *pThis, const int iMode)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->iDrvrMode = iMode;
+ RETiRet;
+}
+
+static rsRetVal
+SetDrvrName(tcpsrv_t *pThis, uchar *const name)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ free(pThis->pszDrvrName);
+ CHKmalloc(pThis->pszDrvrName = ustrdup(name));
+finalize_it:
+ RETiRet;
+}
+
+/* set the driver authentication mode -- rgerhards, 2008-05-19 */
+static rsRetVal
+SetDrvrAuthMode(tcpsrv_t *pThis, uchar *const mode)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ CHKmalloc(pThis->pszDrvrAuthMode = ustrdup(mode));
+finalize_it:
+ RETiRet;
+}
+
+/* set the driver permitexpiredcerts mode -- alorbach, 2018-12-20
+ */
+static rsRetVal
+SetDrvrPermitExpiredCerts(tcpsrv_t *pThis, uchar *mode)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ if (mode != NULL) {
+ CHKmalloc(pThis->pszDrvrPermitExpiredCerts = ustrdup(mode));
+ }
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+SetDrvrCAFile(tcpsrv_t *const pThis, uchar *const mode)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ if (mode != NULL) {
+ CHKmalloc(pThis->pszDrvrCAFile = ustrdup(mode));
+ }
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+SetDrvrCRLFile(tcpsrv_t *const pThis, uchar *const mode)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ if (mode != NULL) {
+ CHKmalloc(pThis->pszDrvrCRLFile = ustrdup(mode));
+ }
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+SetDrvrKeyFile(tcpsrv_t *pThis, uchar *mode)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ if (mode != NULL) {
+ CHKmalloc(pThis->pszDrvrKeyFile = ustrdup(mode));
+ }
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+SetDrvrCertFile(tcpsrv_t *pThis, uchar *mode)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ if (mode != NULL) {
+ CHKmalloc(pThis->pszDrvrCertFile = ustrdup(mode));
+ }
+finalize_it:
+ RETiRet;
+}
+
+
+/* set the driver's permitted peers -- rgerhards, 2008-05-19 */
+static rsRetVal
+SetDrvrPermPeers(tcpsrv_t *pThis, permittedPeers_t *pPermPeers)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->pPermPeers = pPermPeers;
+ RETiRet;
+}
+
+/* set the driver cert extended key usage check setting -- jvymazal, 2019-08-16 */
+static rsRetVal
+SetDrvrCheckExtendedKeyUsage(tcpsrv_t *pThis, int ChkExtendedKeyUsage)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->DrvrChkExtendedKeyUsage = ChkExtendedKeyUsage;
+ RETiRet;
+}
+
+/* set the driver name checking policy -- jvymazal, 2019-08-16 */
+static rsRetVal
+SetDrvrPrioritizeSAN(tcpsrv_t *pThis, int prioritizeSan)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->DrvrPrioritizeSan = prioritizeSan;
+ RETiRet;
+}
+
+/* set the driver Set the driver tls verifyDepth -- alorbach, 2019-12-20 */
+static rsRetVal
+SetDrvrTlsVerifyDepth(tcpsrv_t *pThis, int verifyDepth)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->DrvrTlsVerifyDepth = verifyDepth;
+ RETiRet;
+}
+
+/* End of methods to shuffle autentication settings to the driver.;
+
+ * -------------------------------------------------------------------------- */
+
+
+/* set max number of listeners
+ * this must be called before ConstructFinalize, or it will have no effect!
+ * rgerhards, 2009-08-17
+ */
+static rsRetVal
+SetLstnMax(tcpsrv_t *pThis, int iMax)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->iLstnMax = iMax;
+ RETiRet;
+}
+
+
+/* set if flow control shall be supported
+ */
+static rsRetVal
+SetUseFlowControl(tcpsrv_t *pThis, int bUseFlowControl)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->bUseFlowControl = bUseFlowControl;
+ RETiRet;
+}
+
+
+/* set max number of sessions
+ * this must be called before ConstructFinalize, or it will have no effect!
+ * rgerhards, 2009-04-09
+ */
+static rsRetVal
+SetSessMax(tcpsrv_t *pThis, int iMax)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis->iSessMax = iMax;
+ RETiRet;
+}
+
+
+static rsRetVal
+SetPreserveCase(tcpsrv_t *pThis, int bPreserveCase)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, tcpsrv);
+ pThis-> bPreserveCase = bPreserveCase;
+ RETiRet;
+}
+
+
+/* queryInterface function
+ * rgerhards, 2008-02-29
+ */
+BEGINobjQueryInterface(tcpsrv)
+CODESTARTobjQueryInterface(tcpsrv)
+ if(pIf->ifVersion != tcpsrvCURR_IF_VERSION) { /* check for current version, increment on each change */
+ ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
+ }
+
+ /* ok, we have the right interface, so let's fill it
+ * Please note that we may also do some backwards-compatibility
+ * work here (if we can support an older interface version - that,
+ * of course, also affects the "if" above).
+ */
+ pIf->DebugPrint = tcpsrvDebugPrint;
+ pIf->Construct = tcpsrvConstruct;
+ pIf->ConstructFinalize = tcpsrvConstructFinalize;
+ pIf->Destruct = tcpsrvDestruct;
+
+ pIf->configureTCPListen = configureTCPListen;
+ pIf->create_tcp_socket = create_tcp_socket;
+ pIf->Run = Run;
+
+ pIf->SetKeepAlive = SetKeepAlive;
+ pIf->SetKeepAliveIntvl = SetKeepAliveIntvl;
+ pIf->SetKeepAliveProbes = SetKeepAliveProbes;
+ pIf->SetKeepAliveTime = SetKeepAliveTime;
+ pIf->SetGnutlsPriorityString = SetGnutlsPriorityString;
+ pIf->SetUsrP = SetUsrP;
+ pIf->SetInputName = SetInputName;
+ pIf->SetOrigin = SetOrigin;
+ pIf->SetDfltTZ = SetDfltTZ;
+ pIf->SetbSPFramingFix = SetbSPFramingFix;
+ pIf->SetAddtlFrameDelim = SetAddtlFrameDelim;
+ pIf->SetMaxFrameSize = SetMaxFrameSize;
+ pIf->SetbDisableLFDelim = SetbDisableLFDelim;
+ pIf->SetDiscardTruncatedMsg = SetDiscardTruncatedMsg;
+ pIf->SetSessMax = SetSessMax;
+ pIf->SetUseFlowControl = SetUseFlowControl;
+ pIf->SetLstnMax = SetLstnMax;
+ pIf->SetDrvrMode = SetDrvrMode;
+ pIf->SetDrvrAuthMode = SetDrvrAuthMode;
+ pIf->SetDrvrPermitExpiredCerts = SetDrvrPermitExpiredCerts;
+ pIf->SetDrvrCAFile = SetDrvrCAFile;
+ pIf->SetDrvrCRLFile = SetDrvrCRLFile;
+ pIf->SetDrvrKeyFile = SetDrvrKeyFile;
+ pIf->SetDrvrCertFile = SetDrvrCertFile;
+ pIf->SetDrvrName = SetDrvrName;
+ pIf->SetDrvrPermPeers = SetDrvrPermPeers;
+ pIf->SetCBIsPermittedHost = SetCBIsPermittedHost;
+ pIf->SetCBOpenLstnSocks = SetCBOpenLstnSocks;
+ pIf->SetCBRcvData = SetCBRcvData;
+ pIf->SetCBOnListenDeinit = SetCBOnListenDeinit;
+ pIf->SetCBOnSessAccept = SetCBOnSessAccept;
+ pIf->SetCBOnSessConstructFinalize = SetCBOnSessConstructFinalize;
+ pIf->SetCBOnSessDestruct = SetCBOnSessDestruct;
+ pIf->SetCBOnDestruct = SetCBOnDestruct;
+ pIf->SetCBOnRegularClose = SetCBOnRegularClose;
+ pIf->SetCBOnErrClose = SetCBOnErrClose;
+ pIf->SetOnMsgReceive = SetOnMsgReceive;
+ pIf->SetLinuxLikeRatelimiters = SetLinuxLikeRatelimiters;
+ pIf->SetNotificationOnRemoteClose = SetNotificationOnRemoteClose;
+ pIf->SetNotificationOnRemoteOpen = SetNotificationOnRemoteOpen;
+ pIf->SetPreserveCase = SetPreserveCase;
+ pIf->SetDrvrCheckExtendedKeyUsage = SetDrvrCheckExtendedKeyUsage;
+ pIf->SetDrvrPrioritizeSAN = SetDrvrPrioritizeSAN;
+ pIf->SetDrvrTlsVerifyDepth = SetDrvrTlsVerifyDepth;
+
+finalize_it:
+ENDobjQueryInterface(tcpsrv)
+
+
+/* exit our class
+ * rgerhards, 2008-03-10
+ */
+BEGINObjClassExit(tcpsrv, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */
+CODESTARTObjClassExit(tcpsrv)
+ /* release objects we no longer need */
+ objRelease(tcps_sess, DONT_LOAD_LIB);
+ objRelease(conf, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(netstrms, DONT_LOAD_LIB);
+ objRelease(nssel, DONT_LOAD_LIB);
+ objRelease(netstrm, LM_NETSTRMS_FILENAME);
+ objRelease(net, LM_NET_FILENAME);
+ENDObjClassExit(tcpsrv)
+
+
+/* Initialize our class. Must be called as the very first method
+ * before anything else is called inside this class.
+ * rgerhards, 2008-02-29
+ */
+BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE class also in END MACRO! */
+ /* request objects we use */
+ CHKiRet(objUse(net, LM_NET_FILENAME));
+ CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME));
+ CHKiRet(objUse(netstrm, DONT_LOAD_LIB));
+ CHKiRet(objUse(nssel, DONT_LOAD_LIB));
+ CHKiRet(objUse(nspoll, DONT_LOAD_LIB));
+ CHKiRet(objUse(tcps_sess, DONT_LOAD_LIB));
+ CHKiRet(objUse(conf, CORE_COMPONENT));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
+
+ /* set our own handlers */
+ OBJSetMethodHandler(objMethod_DEBUGPRINT, tcpsrvDebugPrint);
+ OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, tcpsrvConstructFinalize);
+ENDObjClassInit(tcpsrv)
+
+
+/* start worker threads
+ * Important: if we fork, this MUST be done AFTER forking
+ */
+static void
+startWorkerPool(void)
+{
+ int i;
+ int r;
+ pthread_attr_t sessThrdAttr;
+
+ /* We need to temporarily block all signals because the new thread
+ * inherits our signal mask. There is a race if we do not block them
+ * now, and we have seen in practice that this race causes grief.
+ * So we 1. save the current set, 2. block evertyhing, 3. start
+ * threads, and 4 reset the current set to saved state.
+ * rgerhards, 2019-08-16
+ */
+ sigset_t sigSet, sigSetSave;
+ sigfillset(&sigSet);
+ pthread_sigmask(SIG_SETMASK, &sigSet, &sigSetSave);
+
+ wrkrRunning = 0;
+ pthread_cond_init(&wrkrIdle, NULL);
+ pthread_attr_init(&sessThrdAttr);
+ pthread_attr_setstacksize(&sessThrdAttr, 4096*1024);
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ /* init worker info structure! */
+ pthread_cond_init(&wrkrInfo[i].run, NULL);
+ wrkrInfo[i].pSrv = NULL;
+ wrkrInfo[i].numCalled = 0;
+ r = pthread_create(&wrkrInfo[i].tid, &sessThrdAttr, wrkr, &(wrkrInfo[i]));
+ if(r == 0) {
+ wrkrInfo[i].enabled = 1;
+ } else {
+ LogError(r, NO_ERRCODE, "tcpsrv error creating thread");
+ }
+ }
+ pthread_attr_destroy(&sessThrdAttr);
+ pthread_sigmask(SIG_SETMASK, &sigSetSave, NULL);
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static void
+stopWorkerPool(void)
+{
+ int i;
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ pthread_mutex_lock(&wrkrMut);
+ pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
+ pthread_mutex_unlock(&wrkrMut);
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ pthread_cond_destroy(&wrkrInfo[i].run);
+ }
+ pthread_cond_destroy(&wrkrIdle);
+}
+
+
+/* --------------- here now comes the plumbing that makes as a library module --------------- */
+
+BEGINmodExit
+CODESTARTmodExit
+ if(bWrkrRunning) {
+ stopWorkerPool();
+ bWrkrRunning = 0;
+ }
+ /* de-init in reverse order! */
+ tcpsrvClassExit();
+ tcps_sessClassExit();
+ pthread_mutex_destroy(&wrkrMut);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_LIB_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+ /* we just init the worker mutex, but do not start the workers themselves. This is deferred
+ * to the first call of Run(). Reasons for this:
+ * 1. depending on load order, tcpsrv gets loaded during rsyslog startup BEFORE
+ * it forks, in which case the workers would be running in the then-killed parent,
+ * leading to a defuncnt child (we actually had this bug).
+ * 2. depending on circumstances, Run() would possibly never be called, in which case
+ * the worker threads would be totally useless.
+ * Note that in order to guarantee a non-racy worker start, we need to guard the
+ * startup sequence by a mutex, which is why we init it here (no problem with fork()
+ * in this case as the mutex is a pure-memory structure).
+ * rgerhards, 2012-05-18
+ */
+ pthread_mutex_init(&wrkrMut, NULL);
+ bWrkrRunning = 0;
+
+ /* Initialize all classes that are in our module - this includes ourselfs */
+ CHKiRet(tcps_sessClassInit(pModInfo));
+ CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
+ENDmodInit