/* imptcp.c * This is a native implementation of plain tcp. It is intentionally * duplicate work (imtcp). The intent is to gain very fast and simple * native ptcp support, utilizing the best interfaces Linux (no cross- * platform intended!) has to offer. * * Note that in this module we try out some new naming conventions, * so it may look a bit "different" from the other modules. We are * investigating if removing prefixes can help make code more readable. * * File begun on 2010-08-10 by RGerhards * * Copyright 2007-2022 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * -or- * see COPYING.ASL20 in the source distribution * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #if !defined(HAVE_EPOLL_CREATE) # error imptcp requires OS support for epoll - can not build /* imptcp gains speed by using modern Linux capabilities. As such, * it can only be build on platforms supporting the epoll API. */ #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if HAVE_FCNTL_H #include #endif #ifdef HAVE_SYS_PRCTL_H # include #endif #include "rsyslog.h" #include "cfsysline.h" #include "prop.h" #include "dirty.h" #include "module-template.h" #include "unicode-helper.h" #include "glbl.h" #include "errmsg.h" #include "srUtils.h" #include "datetime.h" #include "ruleset.h" #include "msg.h" #include "parserif.h" #include "statsobj.h" #include "ratelimit.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ /* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ #define TCPSRV_NO_ADDTL_DELIMITER -1 /* specifies that no additional delimiter is to be used in TCP framing */ MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP MODULE_CNFNAME("imptcp") /* static data */ DEF_IMOD_STATIC_DATA DEFobjCurrIf(glbl) DEFobjCurrIf(net) DEFobjCurrIf(prop) DEFobjCurrIf(datetime) DEFobjCurrIf(ruleset) DEFobjCurrIf(statsobj) /* forward references */ static void * wrkr(void *myself); /* unfortunately, on some platforms EAGAIN == EWOULDBOLOCK and so checking against * both of them generates a gcc 8 warning for this reason. We do not want to disable * the warning, so we need to work around this via a macro. */ #if EAGAIN == EWOULDBLOCK #define CHK_EAGAIN_EWOULDBLOCK (errno == EAGAIN) #else #define CHK_EAGAIN_EWOULDBLOCK (errno == EAGAIN | errno == EWOULDBLOCK) #endif /* #if EAGAIN == EWOULDBOLOCK */ #define DFLT_wrkrMax 2 #define DFLT_inlineDispatchThreshold 1 #define COMPRESS_NEVER 0 #define COMPRESS_SINGLE_MSG 1 /* old, single-message compression */ /* all other settings are for stream-compression */ #define COMPRESS_STREAM_ALWAYS 2 /* config settings */ typedef struct configSettings_s { int bKeepAlive; /* support keep-alive packets */ int iKeepAliveIntvl; int iKeepAliveProbes; int iKeepAliveTime; int bEmitMsgOnClose; /* emit an informational message on close by remote peer */ int bEmitMsgOnOpen; int bSuppOctetFram; /* support octet-counted framing? */ int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */ int maxFrameSize; uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ uchar *lstnIP; /* which IP we should listen on? */ uchar *pszBindRuleset; int wrkrMax; /* max number of workers (actually "helper workers") */ int iTCPSessMax; /* max open connections per instance */ } configSettings_t; static configSettings_t cs; struct instanceConf_s { int bKeepAlive; /* support keep-alive packets */ int iKeepAliveIntvl; int iKeepAliveProbes; int iKeepAliveTime; int bEmitMsgOnClose; int bEmitMsgOnOpen; int bSuppOctetFram; /* support octet-counted framing? */ int bSPFramingFix; int iAddtlFrameDelim; int socketBacklog; sbool multiLine; uint8_t compressionMode; uchar *pszBindPort; /* port to bind to */ uchar *pszLstnPortFileName; /* Name of the file with dynamic port used by testbench*/ uchar *pszBindAddr; /* IP to bind socket to */ uchar *pszBindPath; /* Path to bind socket to */ uchar *pszBindRuleset; /* name of ruleset to bind to */ uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ int fCreateMode; /* file creation mode for open() */ uid_t fileUID; /* IDs for creation */ gid_t fileGID; int maxFrameSize; int bFailOnPerms; /* fail creation if chown fails? */ ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ uchar *dfltTZ; sbool bUnlink; sbool discardTruncatedMsg; sbool flowControl; unsigned int ratelimitInterval; unsigned int ratelimitBurst; uchar *startRegex; regex_t start_preg; /* compiled version of startRegex */ int iTCPSessMax; /* max open connections */ struct instanceConf_s *next; }; struct modConfData_s { rsconf_t *pConf; /* our overall config object */ instanceConf_t *root, *tail; int wrkrMax; int bProcessOnPoller; int iTCPSessMax; sbool configSetViaV2Method; }; static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ /* module-global parameters */ static struct cnfparamdescr modpdescr[] = { { "threads", eCmdHdlrPositiveInt, 0 }, { "maxsessions", eCmdHdlrInt, 0 }, { "processOnPoller", eCmdHdlrBinary, 0 } }; static struct cnfparamblk modpblk = { CNFPARAMBLK_VERSION, sizeof(modpdescr)/sizeof(struct cnfparamdescr), modpdescr }; /* input instance parameters */ static struct cnfparamdescr inppdescr[] = { { "port", eCmdHdlrString, 0 }, /* legacy: InputTCPServerRun */ { "address", eCmdHdlrString, 0 }, { "path", eCmdHdlrString, 0 }, { "unlink", eCmdHdlrBinary, 0 }, { "discardtruncatedmsg", eCmdHdlrBinary, 0 }, { "fileowner", eCmdHdlrUID, 0 }, { "fileownernum", eCmdHdlrInt, 0 }, { "filegroup", eCmdHdlrGID, 0 }, { "filegroupnum", eCmdHdlrInt, 0 }, { "filecreatemode", eCmdHdlrFileCreateMode, 0 }, { "failonchownfailure", eCmdHdlrBinary, 0 }, { "flowcontrol", eCmdHdlrBinary, 0 }, { "name", eCmdHdlrString, 0 }, { "maxframesize", eCmdHdlrInt, 0 }, { "framing.delimiter.regex", eCmdHdlrString, 0 }, { "ruleset", eCmdHdlrString, 0 }, { "defaulttz", eCmdHdlrString, 0 }, { "supportoctetcountedframing", eCmdHdlrBinary, 0 }, { "framingfix.cisco.asa", eCmdHdlrBinary, 0 }, { "maxsessions", eCmdHdlrInt, 0 }, { "notifyonconnectionclose", eCmdHdlrBinary, 0 }, { "notifyonconnectionopen", eCmdHdlrBinary, 0 }, { "compression.mode", eCmdHdlrGetWord, 0 }, { "keepalive", eCmdHdlrBinary, 0 }, { "keepalive.probes", eCmdHdlrInt, 0 }, { "keepalive.time", eCmdHdlrInt, 0 }, { "keepalive.interval", eCmdHdlrInt, 0 }, { "addtlframedelimiter", eCmdHdlrInt, 0 }, { "ratelimit.interval", eCmdHdlrInt, 0 }, { "ratelimit.burst", eCmdHdlrInt, 0 }, { "multiline", eCmdHdlrBinary, 0 }, { "listenportfilename", eCmdHdlrString, 0 }, { "socketbacklog", eCmdHdlrInt, 0 } }; static struct cnfparamblk inppblk = { CNFPARAMBLK_VERSION, sizeof(inppdescr)/sizeof(struct cnfparamdescr), inppdescr }; #include "im-helper.h" /* must be included AFTER the type definitions! */ static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ /* data elements describing our running config */ typedef struct ptcpsrv_s ptcpsrv_t; typedef struct ptcplstn_s ptcplstn_t; typedef struct ptcpsess_s ptcpsess_t; typedef struct epolld_s epolld_t; /* the ptcp server (listener) object * Note that the object contains support for forming a linked list * of them. It does not make sense to do this seperately. */ struct ptcpsrv_s { ptcpsrv_t *pNext; /* linked list maintenance */ uchar *port; /* Port to listen to */ uchar *lstnIP; /* which IP we should listen on? */ uchar *path; /* Use a unix socket instead */ int fCreateMode; /* file creation mode for open() */ uid_t fileUID; /* IDs for creation */ gid_t fileGID; int maxFrameSize; int bFailOnPerms; /* fail creation if chown fails? */ sbool bUnixSocket; int socketBacklog; uchar *pszLstnPortFileName; int iAddtlFrameDelim; sbool multiLine; int iKeepAliveIntvl; int iKeepAliveProbes; int iKeepAliveTime; uint8_t compressionMode; uchar *pszInputName; uchar *dfltTZ; prop_t *pInputName; /* InputName in (fast to process) property format */ ruleset_t *pRuleset; ptcplstn_t *pLstn; /* root of our listeners */ ptcpsess_t *pSess; /* root of our sessions */ int iTCPSessCnt; int iTCPSessMax; pthread_mutex_t mutSessLst; sbool bKeepAlive; /* support keep-alive packets */ sbool bEmitMsgOnClose; sbool bEmitMsgOnOpen; sbool bSuppOctetFram; sbool bSPFramingFix; sbool bUnlink; sbool discardTruncatedMsg; sbool flowControl; ratelimit_t *ratelimiter; instanceConf_t *inst; }; /* the ptcp session object. Describes a single active session. * includes support for doubly-linked list. */ struct ptcpsess_s { ptcplstn_t *pLstn; /* our listener */ ptcpsess_t *prev, *next; int sock; epolld_t *epd; sbool bzInitDone; /* did we do an init of zstrm already? */ z_stream zstrm; /* zip stream to use for tcp compression */ uint8_t compressionMode; int iMsg; /* index of next char to store in msg */ int iCurrLine; /* 2nd char of current line in regex framing mode */ int bAtStrtOfFram; /* are we at the very beginning of a new frame? */ sbool bSuppOctetFram; /**< copy from listener, to speed up access */ sbool bSPFramingFix; enum { eAtStrtFram, eInOctetCnt, eInMsg, eInMsgTruncation } inputState; /* our current state */ int iOctetsRemain; /* Number of Octets remaining in message */ TCPFRAMINGMODE eFraming; uchar *pMsg; /* message (fragment) received */ uchar *pMsg_save; /* message (fragment) save area in regex framing mode */ prop_t *peerName; /* host name we received messages from */ prop_t *peerIP; const uchar *startRegex;/* cache for performance reasons */ int iAddtlFrameDelim; /* cache for performance reasons */ }; /* the ptcp listener object. Describes a single active listener. */ struct ptcplstn_s { ptcpsrv_t *pSrv; /* our server */ ptcplstn_t *prev, *next; int sock; sbool bSuppOctetFram; sbool bSPFramingFix; epolld_t *epd; statsobj_t *stats; /* listener stats */ intctr_t rcvdBytes; intctr_t rcvdDecompressed; STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) STATSCOUNTER_DEF(ctrSessOpen, mutCtrSessOpen) STATSCOUNTER_DEF(ctrSessOpenErr, mutCtrSessOpenErr) STATSCOUNTER_DEF(ctrSessClose, mutCtrSessClose) DEF_ATOMIC_HELPER_MUT64(mut_rcvdBytes) }; /* The following structure controls the worker threads. Global data is * needed for their access. */ static struct wrkrInfo_s { pthread_t tid; /* the worker's thread ID */ long long unsigned numCalled; /* how often was this called */ int wrkrIdx; /* index for this worker - shortcut for thread name */ } *wrkrInfo; static int wrkrRunning; /* type of object stored in epoll descriptor */ typedef enum { epolld_lstn, epolld_sess } epolld_type_t; /* an epoll descriptor. contains all information necessary to process * the result of epoll. */ struct epolld_s { epolld_type_t typ; void *ptr; int sock; struct epoll_event ev; }; typedef struct io_req_s { STAILQ_ENTRY(io_req_s) link; epolld_t *epd; } io_req_t; typedef struct io_q_s { STAILQ_HEAD(ioq_s, io_req_s) q; STATSCOUNTER_DEF(ctrEnq, mutCtrEnq); int ctrMaxSz; //TODO: discuss potential problems around concurrent reads and writes int sz; //current q size statsobj_t *stats; pthread_mutex_t mut; pthread_cond_t wakeup_worker; } io_q_t; /* global data */ pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */ static ptcpsrv_t *pSrvRoot = NULL; static int epollfd = -1; /* (sole) descriptor for epoll */ static int iMaxLine; /* maximum size of a single message */ static io_q_t io_q; /* forward definitions */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6); /* function to suppress TSAN known-good case * We do not use a mutex an epd, but we do always access it in * pure sequence. Adding a mutex just to cover this "cosmetic" * would result in uncesseary performance penalty. */ static void ATTR_NONNULL() imptcp_destruct_epd(ptcpsess_t *const pSess) { free(pSess->epd); pSess->epd = NULL; } /* some simple constructors/destructors */ static void ATTR_NONNULL() destructSess(ptcpsess_t *const pSess) { imptcp_destruct_epd(pSess); free(pSess->pMsg_save); free(pSess->pMsg); prop.Destruct(&pSess->peerName); prop.Destruct(&pSess->peerIP); /* TODO: make these inits compile-time switch depending: */ pSess->pMsg = NULL; free(pSess); } /* remove session from server */ static void unlinkSess(ptcpsess_t *pSess) { ptcpsrv_t *pSrv = pSess->pLstn->pSrv; pthread_mutex_lock(&pSrv->mutSessLst); pSrv->iTCPSessCnt--; /* finally unlink session from structures */ if(pSess->next != NULL) pSess->next->prev = pSess->prev; if(pSess->prev == NULL) { /* need to update root! */ pSrv->pSess = pSess->next; } else { pSess->prev->next = pSess->next; } pthread_mutex_unlock(&pSrv->mutSessLst); } static void destructSrv(ptcpsrv_t *pSrv) { if(pSrv->ratelimiter != NULL) ratelimitDestruct(pSrv->ratelimiter); if(pSrv->pInputName != NULL) prop.Destruct(&pSrv->pInputName); pthread_mutex_destroy(&pSrv->mutSessLst); free(pSrv->pszInputName); free(pSrv->port); free(pSrv->pszLstnPortFileName); free(pSrv->path); free(pSrv->lstnIP); free(pSrv); } /****************************************** TCP SUPPORT FUNCTIONS ***********************************/ /* We may later think about moving this into a helper library again. But the whole point * so far was to keep everything related close togehter. -- rgerhards, 2010-08-10 */ static rsRetVal startupUXSrv(ptcpsrv_t *pSrv) { DEFiRet; int sock; int sockflags; struct sockaddr_un local; uchar *path = pSrv->path == NULL ? UCHAR_CONSTANT("") : pSrv->path; DBGPRINTF("imptcp: creating listen unix socket at %s\n", path); sock = socket(AF_UNIX, SOCK_STREAM, 0); if(sock < 0) { LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: error creating unix socket"); ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); } local.sun_family = AF_UNIX; strncpy(local.sun_path, (char*) path, sizeof(local.sun_path)-1); if (pSrv->bUnlink) { unlink(local.sun_path); } /* We use non-blocking IO! */ if ((sockflags = fcntl(sock, F_GETFL)) != -1) { sockflags |= O_NONBLOCK; /* SETFL could fail too, so get it caught by the subsequent error check. */ sockflags = fcntl(sock, F_SETFL, sockflags); } if (sockflags == -1) { LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: error setting fcntl(O_NONBLOCK) on unix socket"); ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); } if (bind(sock, (struct sockaddr *)&local, SUN_LEN(&local)) < 0) { LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: error while binding unix socket"); ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); } if (listen(sock, pSrv->socketBacklog) < 0) { LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: unix socket listen error"); ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); } if(chown(local.sun_path, pSrv->fileUID, pSrv->fileGID) != 0) { if(pSrv->bFailOnPerms) { LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: unix socket chown error"); ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); } } if(chmod(local.sun_path, pSrv->fCreateMode) != 0) { if(pSrv->bFailOnPerms) { LogError(errno, RS_RET_ERR_CRE_AFUX, "imptcp: unix socket chmod error"); ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); } } CHKiRet(addLstn(pSrv, sock, 0)); finalize_it: if (iRet != RS_RET_OK) { if (sock != -1) { close(sock); } } RETiRet; } /* Start up a server. That means all of its listeners are created. * Does NOT yet accept/process any incoming data (but binds ports). Hint: this * code is to be executed before dropping privileges. */ PRAGMA_DIAGNOSTIC_PUSH static rsRetVal startupSrv(ptcpsrv_t *pSrv) { DEFiRet; int error, maxs, on = 1; int sock = -1; int numSocks; int sockflags; struct addrinfo hints, *res = NULL, *r; uchar *lstnIP; int isIPv6 = 0; int port_override = 0; /* if dyn port (0): use this for actually bound port */ union { struct sockaddr *sa; struct sockaddr_in *ipv4; struct sockaddr_in6 *ipv6; } savecast; if (pSrv->bUnixSocket) { return startupUXSrv(pSrv); } lstnIP = pSrv->lstnIP == NULL ? UCHAR_CONSTANT("") : pSrv->lstnIP; DBGPRINTF("imptcp: creating listen socket on server '%s', port %s\n", lstnIP, pSrv->port); memset(&hints, 0, sizeof(hints)); hints.ai_flags = AI_PASSIVE; hints.ai_family = glbl.GetDefPFFamily(runModConf->pConf); hints.ai_socktype = SOCK_STREAM; error = getaddrinfo((char*)pSrv->lstnIP, (char*) pSrv->port, &hints, &res); if(error) { DBGPRINTF("error %d querying server '%s', port '%s'\n", error, pSrv->lstnIP, pSrv->port); ABORT_FINALIZE(RS_RET_INVALID_PORT); } /* Count max number of sockets we may open */ for(maxs = 0, r = res; r != NULL ; r = r->ai_next, maxs++) { /* EMPTY */; } numSocks = 0; /* num of sockets counter at start of array */ for(r = res; r != NULL ; r = r->ai_next) { if(port_override != 0) { savecast.sa = (struct sockaddr*)r->ai_addr; if(r->ai_family == AF_INET6) { savecast.ipv6->sin6_port = port_override; } else { savecast.ipv4->sin_port = port_override; } } sock = socket(r->ai_family, r->ai_socktype, r->ai_protocol); if(sock < 0) { if(!(r->ai_family == PF_INET6 && errno == EAFNOSUPPORT)) { DBGPRINTF("error %d creating tcp listen socket", errno); /* it is debatable if PF_INET with EAFNOSUPPORT should * also be ignored... */ } continue; } if(r->ai_family == AF_INET6) { isIPv6 = 1; #ifdef IPV6_V6ONLY int iOn = 1; if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&iOn, sizeof (iOn)) < 0) { close(sock); sock = -1; continue; } #endif } else { isIPv6 = 0; } if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0 ) { DBGPRINTF("error %d setting tcp socket option\n", errno); close(sock); sock = -1; continue; } /* We use non-blocking IO! */ if((sockflags = fcntl(sock, F_GETFL)) != -1) { sockflags |= O_NONBLOCK; /* SETFL could fail too, so get it caught by the subsequent * error check. */ sockflags = fcntl(sock, F_SETFL, sockflags); } if(sockflags == -1) { DBGPRINTF("error %d setting fcntl(O_NONBLOCK) on tcp socket", errno); close(sock); sock = -1; continue; } /* We need to enable BSD compatibility. Otherwise an attacker * could flood our log files by sending us tons of ICMP errors. */ #if !defined (_AIX) #ifndef BSD if(net.should_use_so_bsdcompat()) { if (setsockopt(sock, SOL_SOCKET, SO_BSDCOMPAT, (char *) &on, sizeof(on)) < 0) { LogError(errno, NO_ERRCODE, "TCP setsockopt(BSDCOMPAT)"); close(sock); sock = -1; continue; } } #endif #endif if( (bind(sock, r->ai_addr, r->ai_addrlen) < 0) #ifndef IPV6_V6ONLY && (errno != EADDRINUSE) #endif ) { /* TODO: check if *we* bound the socket - else we *have* an error! */ LogError(errno, NO_ERRCODE, "Error while binding tcp socket"); close(sock); sock = -1; continue; } /* if we bind to dynamic port (port 0 given), we will do so consistently. Thus * once we got a dynamic port, we will keep it and use it for other protocols * as well. As of my understanding, this should always work as the OS does not * pick a port that is used by some protocol (well, at least this looks very * unlikely...). If our asusmption is wrong, we should iterate until we find a * combination that works - it is very unusual to have the same service listen * on differnt ports on IPv4 and IPv6. */ savecast.sa = (struct sockaddr*)r->ai_addr; const int currport = (isIPv6) ? savecast.ipv6->sin6_port : savecast.ipv4->sin_port; if(currport == 0) { socklen_t socklen_r = r->ai_addrlen; if(getsockname(sock, r->ai_addr, &socklen_r) == -1) { LogError(errno, NO_ERRCODE, "nsd_ptcp: ListenPortFileName: getsockname:" "error while trying to get socket"); } r->ai_addrlen = socklen_r; savecast.sa = (struct sockaddr*)r->ai_addr; port_override = (isIPv6) ? savecast.ipv6->sin6_port : savecast.ipv4->sin_port; if(pSrv->pszLstnPortFileName != NULL) { FILE *fp; if((fp = fopen((const char*)pSrv->pszLstnPortFileName, "w+")) == NULL) { LogError(errno, RS_RET_IO_ERROR, "imptcp: ListenPortFileName: " "error while trying to open file"); ABORT_FINALIZE(RS_RET_IO_ERROR); } if(isIPv6) { fprintf(fp, "%d", ntohs(savecast.ipv6->sin6_port)); } else { fprintf(fp, "%d", ntohs(savecast.ipv4->sin_port)); } fclose(fp); } } if(listen(sock, pSrv->socketBacklog) < 0) { LogError(errno, NO_ERRCODE, "imptcp error listening on port"); DBGPRINTF("tcp listen error %d, suspending\n", errno); close(sock); sock = -1; continue; } /* if we reach this point, we were able to obtain a valid socket, so we can * create our listener object. -- rgerhards, 2010-08-10 */ CHKiRet(addLstn(pSrv, sock, isIPv6)); ++numSocks; } if(numSocks != maxs) { DBGPRINTF("We could initialize %d TCP listen sockets out of %d we received " "- this may or may not be an error indication.\n", numSocks, maxs); } if(numSocks == 0) { DBGPRINTF("No TCP listen sockets could successfully be initialized"); ABORT_FINALIZE(RS_RET_COULD_NOT_BIND); } finalize_it: if(res != NULL) { freeaddrinfo(res); } if(iRet != RS_RET_OK) { if(sock != -1) { close(sock); } } RETiRet; } PRAGMA_DIAGNOSTIC_POP /* Set pRemHost based on the address provided. This is to be called upon accept()ing * a connection request. It must be provided by the socket we received the * message on as well as a NI_MAXHOST size large character buffer for the FQDN. * Please see http://www.hmug.org/man/3/getnameinfo.php (under Caveats) * for some explanation of the code found below. If we detect a malicious * hostname, we return RS_RET_MALICIOUS_HNAME and let the caller decide * on how to deal with that. * rgerhards, 2008-03-31 */ static rsRetVal getPeerNames(prop_t **peerName, prop_t **peerIP, struct sockaddr *pAddr, sbool bUXServer) { int error; uchar szIP[NI_MAXHOST+1] = ""; uchar szHname[NI_MAXHOST+1] = ""; struct addrinfo hints, *res; sbool bMaliciousHName = 0; DEFiRet; *peerName = NULL; *peerIP = NULL; if (bUXServer) { strncpy((char *) szHname, (char *) glbl.GetLocalHostName(), NI_MAXHOST); strncpy((char *) szIP, (char *) glbl.GetLocalHostIP(), NI_MAXHOST); szHname[NI_MAXHOST] = '\0'; szIP[NI_MAXHOST] = '\0'; } else { error = getnameinfo(pAddr, SALEN(pAddr), (char *) szIP, sizeof(szIP), NULL, 0, NI_NUMERICHOST); if (error) { DBGPRINTF("Malformed from address %s\n", gai_strerror(error)); strcpy((char *) szHname, "???"); strcpy((char *) szIP, "???"); ABORT_FINALIZE(RS_RET_INVALID_HNAME); } if (!glbl.GetDisableDNS(runConf)) { error = getnameinfo(pAddr, SALEN(pAddr), (char *) szHname, NI_MAXHOST, NULL, 0, NI_NAMEREQD); if (error == 0) { memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_flags = AI_NUMERICHOST; hints.ai_socktype = SOCK_STREAM; /* we now do a lookup once again. This one should fail, * because we should not have obtained a non-numeric address. If * we got a numeric one, someone messed with DNS! */ if (getaddrinfo((char *) szHname, NULL, &hints, &res) == 0) { freeaddrinfo(res); /* OK, we know we have evil, so let's indicate this to our caller */ snprintf((char *) szHname, sizeof(szHname), "[MALICIOUS:IP=%s]", szIP); DBGPRINTF("Malicious PTR record, IP = \"%s\" HOST = \"%s\"", szIP, szHname); bMaliciousHName = 1; } } else { strcpy((char *) szHname, (char *) szIP); } } else { strcpy((char *) szHname, (char *) szIP); } } /* We now have the names, so now let's allocate memory and store them permanently. */ CHKiRet(prop.Construct(peerName)); CHKiRet(prop.SetString(*peerName, szHname, ustrlen(szHname))); CHKiRet(prop.ConstructFinalize(*peerName)); CHKiRet(prop.Construct(peerIP)); CHKiRet(prop.SetString(*peerIP, szIP, ustrlen(szIP))); CHKiRet(prop.ConstructFinalize(*peerIP)); finalize_it: if(iRet != RS_RET_OK) { if(*peerName != NULL) prop.Destruct(peerName); if(*peerIP != NULL) prop.Destruct(peerIP); } if(bMaliciousHName) iRet = RS_RET_MALICIOUS_HNAME; RETiRet; } /* Enable KEEPALIVE handling on the socket. */ static rsRetVal EnableKeepAlive(ptcplstn_t *pLstn, int sock) { int ret; int optval; socklen_t optlen; DEFiRet; optval = 1; optlen = sizeof(optval); ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); if(ret < 0) { dbgprintf("EnableKeepAlive socket call returns error %d\n", ret); ABORT_FINALIZE(RS_RET_ERR); } # if defined(TCP_KEEPCNT) if(pLstn->pSrv->iKeepAliveProbes > 0) { optval = pLstn->pSrv->iKeepAliveProbes; optlen = sizeof(optval); ret = setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, &optval, optlen); } else { ret = 0; } # else ret = -1; # endif if(ret < 0) { LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive probes - ignored"); } # if defined(TCP_KEEPCNT) if(pLstn->pSrv->iKeepAliveTime > 0) { optval = pLstn->pSrv->iKeepAliveTime; optlen = sizeof(optval); ret = setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, &optval, optlen); } else { ret = 0; } # else ret = -1; # endif if(ret < 0) { LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive time - ignored"); } # if defined(TCP_KEEPCNT) if(pLstn->pSrv->iKeepAliveIntvl > 0) { optval = pLstn->pSrv->iKeepAliveIntvl; optlen = sizeof(optval); ret = setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, &optval, optlen); } else { ret = 0; } # else ret = -1; # endif if(ret < 0) { LogError(errno, NO_ERRCODE, "imptcp cannot set keepalive intvl - ignored"); } dbgprintf("KEEPALIVE enabled for socket %d\n", sock); finalize_it: RETiRet; } /* accept an incoming connection request * rgerhards, 2008-04-22 */ static rsRetVal ATTR_NONNULL() AcceptConnReq(ptcplstn_t *const pLstn, int *const newSock, prop_t **peerName, prop_t **peerIP) { int sockflags; struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); int iNewSock = -1; DEFiRet; *peerName = NULL; /* ensure we know if we don't have one! */ iNewSock = accept(pLstn->sock, (struct sockaddr*) &addr, &addrlen); if(iNewSock < 0) { if(CHK_EAGAIN_EWOULDBLOCK || errno == EMFILE) ABORT_FINALIZE(RS_RET_NO_MORE_DATA); LogError(errno, RS_RET_ACCEPT_ERR, "error accepting connection " "on listen socket %d", pLstn->sock); ABORT_FINALIZE(RS_RET_ACCEPT_ERR); } if(addrlen == 0) { LogError(errno, RS_RET_ACCEPT_ERR, "AcceptConnReq could not obtain " "remote peer identification on listen socket %d", pLstn->sock); } if(pLstn->pSrv->bKeepAlive) EnableKeepAlive(pLstn, iNewSock);/* we ignore errors, best to do! */ CHKiRet(getPeerNames(peerName, peerIP, (struct sockaddr *) &addr, pLstn->pSrv->bUnixSocket)); /* set the new socket to non-blocking IO */ if((sockflags = fcntl(iNewSock, F_GETFL)) != -1) { sockflags |= O_NONBLOCK; /* SETFL could fail too, so get it caught by the subsequent * error check. */ sockflags = fcntl(iNewSock, F_SETFL, sockflags); } if(sockflags == -1) { LogError(errno, RS_RET_IO_ERROR, "error setting fcntl(O_NONBLOCK) on " "tcp socket %d", iNewSock); prop.Destruct(peerName); prop.Destruct(peerIP); ABORT_FINALIZE(RS_RET_IO_ERROR); } if(pLstn->pSrv->bEmitMsgOnOpen) { LogMsg(0, RS_RET_NO_ERRCODE, LOG_INFO, "imptcp: connection established with host: %s", propGetSzStr(*peerName)); } STATSCOUNTER_INC(pLstn->ctrSessOpen, pLstn->mutCtrSessOpen); *newSock = iNewSock; finalize_it: DBGPRINTF("iRet: %d\n", iRet); if(iRet != RS_RET_OK) { if(iRet != RS_RET_NO_MORE_DATA && pLstn->pSrv->bEmitMsgOnOpen) { LogError(0, NO_ERRCODE, "imptcp: connection could not be " "established with host: %s", *peerName == NULL ? "(could not query)" : (const char*)propGetSzStr(*peerName)); } STATSCOUNTER_INC(pLstn->ctrSessOpenErr, pLstn->mutCtrSessOpenErr); /* the close may be redundant, but that doesn't hurt... */ if(iNewSock != -1) close(iNewSock); } RETiRet; } /* This is a helper for submitting the message to the rsyslog core. * It does some common processing, including resetting the various * state variables to a "processed" state. * Note that this function is also called if we had a buffer overflow * due to a too-long message. So far, there is no indication this * happened and it may be worth thinking about different handling * of this case (what obviously would require a change to this * function or some related code). * rgerhards, 2009-04-23 * EXTRACT from tcps_sess.c */ static rsRetVal doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { smsg_t *pMsg; ptcpsrv_t *pSrv; DEFiRet; if(pThis->iMsg == 0) { DBGPRINTF("discarding zero-sized message\n"); FINALIZE; } pSrv = pThis->pLstn->pSrv; /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); MsgSetInputName(pMsg, pSrv->pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); if(pSrv->dfltTZ != NULL) MsgSetDfltTZ(pMsg, (char*) pSrv->dfltTZ); MsgSetFlowControlType(pMsg, pSrv->flowControl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, pThis->peerName); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->peerIP)); MsgSetRuleset(pMsg, pSrv->pRuleset); STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg); finalize_it: /* reset status variables */ pThis->bAtStrtOfFram = 1; pThis->iMsg = 0; RETiRet; } /* process the data received, special case if the framing is specified via * a regex. For more info see processDataRcvd(). */ static rsRetVal ATTR_NONNULL() processDataRcvd_regexFraming(ptcpsess_t *const __restrict__ pThis, char **const buff, struct syslogTime *const stTime, const time_t ttGenTime, multi_submit_t *const pMultiSub, unsigned *const __restrict__ pnMsgs) { DEFiRet; const instanceConf_t *const inst = pThis->pLstn->pSrv->inst; assert(inst->startRegex != NULL); const char c = **buff; pThis->pMsg[pThis->iMsg++] = c; pThis->pMsg[pThis->iMsg] = '\0'; if(pThis->iMsg == 2*iMaxLine) { LogError(0, RS_RET_OVERSIZE_MSG, "imptcp: more then double max message size (%d) " "received without finding frame terminator via regex - assuming " "end of frame now.", pThis->iMsg+1); doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); ++(*pnMsgs); pThis->iMsg = 0; pThis->iCurrLine = 1; } if(c == '\n') { pThis->iCurrLine = pThis->iMsg; } else { const int isMatch = !regexec(&inst->start_preg, (char*)pThis->pMsg+pThis->iCurrLine, 0, NULL, 0); if(isMatch) { DBGPRINTF("regex match (%d), framing line: %s\n", pThis->iCurrLine, pThis->pMsg); strcpy((char*)pThis->pMsg_save, (char*) pThis->pMsg+pThis->iCurrLine); pThis->iMsg = pThis->iCurrLine - 1; doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); ++(*pnMsgs); strcpy((char*)pThis->pMsg, (char*)pThis->pMsg_save); pThis->iMsg = ustrlen(pThis->pMsg_save); pThis->iCurrLine = 1; } } RETiRet; } /* process the data received. As TCP is stream based, we need to process the * data inside a state machine. The actual data received is passed in byte-by-byte * from DataRcvd, and this function here compiles messages from them and submits * the end result to the queue. Introducing this function fixes a long-term bug ;) * rgerhards, 2008-03-14 * EXTRACT from tcps_sess.c */ static rsRetVal ATTR_NONNULL(1, 2) processDataRcvd(ptcpsess_t *const __restrict__ pThis, char **buff, const int buffLen, struct syslogTime *stTime, const time_t ttGenTime, multi_submit_t *pMultiSub, unsigned *const __restrict__ pnMsgs) { DEFiRet; const char c = **buff; int octetsToCopy, octetsToDiscard; if(pThis->startRegex != NULL) { processDataRcvd_regexFraming(pThis, buff, stTime, ttGenTime, pMultiSub, pnMsgs); FINALIZE; } if(pThis->inputState == eAtStrtFram) { if(pThis->bSuppOctetFram && isdigit((int) c)) { pThis->inputState = eInOctetCnt; pThis->iOctetsRemain = 0; pThis->eFraming = TCP_FRAMING_OCTET_COUNTING; } else if(pThis->bSPFramingFix && c == ' ') { /* Cisco very occasionally sends a SP after a LF, which * thrashes framing if not taken special care of. Here, * we permit space *in front of the next frame* and * ignore it. */ FINALIZE; } else { pThis->inputState = eInMsg; pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; } } if(pThis->inputState == eInOctetCnt) { uchar *propPeerName = NULL; int lenPeerName = 0; uchar *propPeerIP = NULL; int lenPeerIP = 0; if(isdigit(c)) { if(pThis->iOctetsRemain <= 200000000) { pThis->iOctetsRemain = pThis->iOctetsRemain * 10 + c - '0'; } if(pThis->iMsg < iMaxLine) { *(pThis->pMsg + pThis->iMsg++) = c; } } else { /* done with the octet count, so this must be the SP terminator */ DBGPRINTF("TCP Message with octet-counter, size %d.\n", pThis->iOctetsRemain); prop.GetString(pThis->peerName, &propPeerName, &lenPeerName); prop.GetString(pThis->peerIP, &propPeerIP, &lenPeerIP); if(c != ' ') { LogError(0, NO_ERRCODE, "Framing Error in received TCP message " "from peer: (hostname) %s, (ip) %s: delimiter is not " "SP but has ASCII value %d.", propPeerName, propPeerIP, c); } if(pThis->iOctetsRemain < 1) { /* TODO: handle the case where the octet count is 0! */ LogError(0, NO_ERRCODE, "Framing Error in received TCP message" " from peer: (hostname) %s, (ip) %s: invalid octet count %d.", propPeerName, propPeerIP, pThis->iOctetsRemain); pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; } else if(pThis->iOctetsRemain > iMaxLine) { /* while we can not do anything against it, we can at least log an indication * that something went wrong) -- rgerhards, 2008-03-14 */ DBGPRINTF("truncating message with %d octets - max msg size is %d\n", pThis->iOctetsRemain, iMaxLine); LogError(0, NO_ERRCODE, "received oversize message from peer: " "(hostname) %s, (ip) %s: size is %d bytes, max msg " "size is %d, truncating...", propPeerName, propPeerIP, pThis->iOctetsRemain, iMaxLine); } if(pThis->iOctetsRemain > pThis->pLstn->pSrv->maxFrameSize) { LogError(0, NO_ERRCODE, "Framing Error in received TCP message " "from peer: (hostname) %s, (ip) %s: frame too large: %d, " "change to octet stuffing", propPeerName, propPeerIP, pThis->iOctetsRemain); pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; } else { pThis->iMsg = 0; } pThis->inputState = eInMsg; } } else if(pThis->inputState == eInMsgTruncation) { if ((c == '\n') || ((pThis->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->iAddtlFrameDelim))) { pThis->inputState = eAtStrtFram; } } else { assert(pThis->inputState == eInMsg); if (pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { int iMsg = pThis->iMsg; /* cache value for faster access */ if(iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ int i = 1; char currBuffChar; while(i < buffLen && ((currBuffChar = (*buff)[i]) != '\n' && (pThis->iAddtlFrameDelim == TCPSRV_NO_ADDTL_DELIMITER || currBuffChar != pThis->iAddtlFrameDelim))) { i++; } LogError(0, NO_ERRCODE, "imptcp %s: message received is at least %d byte larger than " "max msg size; message will be split starting at: \"%.*s\"\n", pThis->pLstn->pSrv->pszInputName, i, (i < 32) ? i : 32, *buff); doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); iMsg = 0; ++(*pnMsgs); if(pThis->pLstn->pSrv->discardTruncatedMsg == 1) { pThis->inputState = eInMsgTruncation; } /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good * candidate for a configuration parameter... * rgerhards, 2006-12-04 */ } if ((c == '\n') || ((pThis->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->iAddtlFrameDelim)) ) { /* record delimiter? */ if(pThis->pLstn->pSrv->multiLine) { if((buffLen == 1) || ((*buff)[1] == '<')) { doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); iMsg = 0; /* Reset cached value! */ ++(*pnMsgs); pThis->inputState = eAtStrtFram; } else { if(iMsg < iMaxLine) { pThis->pMsg[iMsg++] = c; } } } else { doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); iMsg = 0; /* Reset cached value! */ ++(*pnMsgs); pThis->inputState = eAtStrtFram; } } else { /* IMPORTANT: here we copy the actual frame content to the message - for BOTH * framing modes! If we have a message that is larger than the max msg size, * we truncate it. This is the best we can do in light of what the engine supports. * -- rgerhards, 2008-03-14 */ if(likely(iMsg < iMaxLine)) { pThis->pMsg[iMsg++] = c; } } pThis->iMsg = iMsg; /* update "real value" with cached one */ } else { assert(pThis->eFraming == TCP_FRAMING_OCTET_COUNTING); octetsToCopy = pThis->iOctetsRemain; octetsToDiscard = 0; if (buffLen < octetsToCopy) { octetsToCopy = buffLen; } if (octetsToCopy + pThis->iMsg > iMaxLine) { octetsToDiscard = octetsToCopy - (iMaxLine - pThis->iMsg); octetsToCopy = iMaxLine - pThis->iMsg; } memcpy(pThis->pMsg + pThis->iMsg, *buff, octetsToCopy); pThis->iMsg += octetsToCopy; pThis->iOctetsRemain -= (octetsToCopy + octetsToDiscard); *buff += (octetsToCopy + octetsToDiscard - 1); if (pThis->iOctetsRemain == 0) { /* we have end of frame! */ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); ++(*pnMsgs); pThis->inputState = eAtStrtFram; } } } finalize_it: RETiRet; } /* Processes the data received via a TCP session. If there * is no other way to handle it, data is discarded. * Input parameter data is the data received, iLen is its * len as returned from recv(). iLen must be 1 or more (that * is errors must be handled by caller!). iTCPSess must be * the index of the TCP session that received the data. * rgerhards 2005-07-04 * And another change while generalizing. We now return either * RS_RET_OK, which means the session should be kept open * or anything else, which means it must be closed. * rgerhards, 2008-03-01 * As a performance optimization, we pick up the timestamp here. Acutally, * this *is* the *correct* reception step for all the data we received, because * we have just received a bunch of data! -- rgerhards, 2009-06-16 * EXTRACT from tcps_sess.c */ static rsRetVal ATTR_NONNULL(1, 2) DataRcvdUncompressed(ptcpsess_t *pThis, char *pData, const size_t iLen, struct syslogTime *stTime, time_t ttGenTime) { multi_submit_t multiSub; smsg_t *pMsgs[CONF_NUM_MULTISUB]; char *pEnd; unsigned nMsgs = 0; DEFiRet; assert(iLen > 0); if(ttGenTime == 0) datetime.getCurrTime(stTime, &ttGenTime, TIME_IN_LOCALTIME); multiSub.ppMsgs = pMsgs; multiSub.maxElem = CONF_NUM_MULTISUB; multiSub.nElem = 0; /* We now copy the message to the session buffer. */ pEnd = pData + iLen; /* this is one off, which is intensional */ while(pData < pEnd) { CHKiRet(processDataRcvd(pThis, &pData, pEnd - pData, stTime, ttGenTime, &multiSub, &nMsgs)); pData++; } iRet = multiSubmitFlush(&multiSub); if(runConf->globals.senderKeepTrack) statsRecordSender(propGetSzStr(pThis->peerName), nMsgs, ttGenTime); finalize_it: RETiRet; } static rsRetVal DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len) { struct syslogTime stTime; time_t ttGenTime; int zRet; /* zlib return state */ unsigned outavail; uchar zipBuf[64*1024]; // TODO: alloc on heap, and much larger (512KiB? batch size!) DEFiRet; // TODO: can we do stats counters? Even if they are not 100% correct under all cases, // by simply updating the input and output sizes? uint64_t outtotal; datetime.getCurrTime(&stTime, &ttGenTime, TIME_IN_LOCALTIME); outtotal = 0; if(!pThis->bzInitDone) { /* allocate deflate state */ pThis->zstrm.zalloc = Z_NULL; pThis->zstrm.zfree = Z_NULL; pThis->zstrm.opaque = Z_NULL; zRet = inflateInit(&pThis->zstrm); if(zRet != Z_OK) { DBGPRINTF("imptcp: error %d returned from zlib/inflateInit()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } pThis->bzInitDone = RSTRUE; } pThis->zstrm.next_in = (Bytef*) buf; pThis->zstrm.avail_in = len; /* run inflate() on buffer until everything has been uncompressed */ do { DBGPRINTF("imptcp: in inflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in); pThis->zstrm.avail_out = sizeof(zipBuf); pThis->zstrm.next_out = zipBuf; zRet = inflate(&pThis->zstrm, Z_SYNC_FLUSH); /* no bad return value */ //zRet = inflate(&pThis->zstrm, Z_NO_FLUSH); /* no bad return value */ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out); outavail = sizeof(zipBuf) - pThis->zstrm.avail_out; if(outavail != 0) { outtotal += outavail; pThis->pLstn->rcvdDecompressed += outavail; CHKiRet(DataRcvdUncompressed(pThis, (char*)zipBuf, outavail, &stTime, ttGenTime)); } } while (pThis->zstrm.avail_out == 0); dbgprintf("end of DataRcvCompress, sizes: in %lld, out %llu\n", (long long) len, (long long unsigned) outtotal); finalize_it: RETiRet; } static rsRetVal DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) { struct syslogTime stTime; DEFiRet; ATOMIC_ADD_uint64(&pThis->pLstn->rcvdBytes, &pThis->pLstn->mut_rcvdBytes, iLen); if(pThis->compressionMode >= COMPRESS_STREAM_ALWAYS) iRet = DataRcvdCompressed(pThis, pData, iLen); else iRet = DataRcvdUncompressed(pThis, pData, iLen, &stTime, 0); RETiRet; } /****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/ static void initConfigSettings(void) { cs.bEmitMsgOnClose = 0; cs.bEmitMsgOnOpen = 0; cs.wrkrMax = DFLT_wrkrMax; cs.bSuppOctetFram = 1; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; cs.maxFrameSize = 200000; cs.pszInputName = NULL; cs.pszBindRuleset = NULL; cs.pszInputName = NULL; cs.lstnIP = NULL; } /* add socket to the epoll set */ static rsRetVal addEPollSock(epolld_type_t typ, void *ptr, int sock, epolld_t **pEpd) { DEFiRet; epolld_t *epd = NULL; CHKmalloc(epd = calloc(1, sizeof(epolld_t))); epd->typ = typ; epd->ptr = ptr; epd->sock = sock; *pEpd = epd; epd->ev.events = EPOLLIN|EPOLLONESHOT; epd->ev.data.ptr = (void*) epd; if(epoll_ctl(epollfd, EPOLL_CTL_ADD, sock, &(epd->ev)) != 0) { LogError(errno, RS_RET_EPOLL_CTL_FAILED, "os error during epoll ADD"); ABORT_FINALIZE(RS_RET_EPOLL_CTL_FAILED); } DBGPRINTF("imptcp: added socket %d to epoll[%d] set\n", sock, epollfd); finalize_it: if(iRet != RS_RET_OK) { if (epd != NULL) { LogError(0, RS_RET_INTERNAL_ERROR, "error: could not initialize mutex for ptcp " "connection for socket: %d", sock); } free(epd); } RETiRet; } /* add a listener to the server */ static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6) { DEFiRet; ptcplstn_t *pLstn = NULL; uchar statname[64]; CHKmalloc(pLstn = calloc(1, sizeof(ptcplstn_t))); pLstn->pSrv = pSrv; pLstn->bSuppOctetFram = pSrv->bSuppOctetFram; pLstn->bSPFramingFix = pSrv->bSPFramingFix; pLstn->sock = sock; /* support statistics gathering */ uchar *inputname; if(pSrv->pszInputName == NULL) { inputname = (uchar*)"imptcp"; } else { inputname = pSrv->pszInputName; } CHKiRet(statsobj.Construct(&(pLstn->stats))); snprintf((char*)statname, sizeof(statname), "%s(%s/%s/%s)", inputname, (pSrv->lstnIP == NULL) ? "*" : (char*)pSrv->lstnIP, pSrv->port, isIPv6 ? "IPv6" : "IPv4"); statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ CHKiRet(statsobj.SetName(pLstn->stats, statname)); CHKiRet(statsobj.SetOrigin(pLstn->stats, (uchar*)"imptcp")); STATSCOUNTER_INIT(pLstn->ctrSubmit, pLstn->mutCtrSubmit); CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSubmit))); STATSCOUNTER_INIT(pLstn->ctrSessOpen, pLstn->mutCtrSessOpen); CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("sessions.opened"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSessOpen))); STATSCOUNTER_INIT(pLstn->ctrSessOpenErr, pLstn->mutCtrSessOpenErr); CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("sessions.openfailed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSessOpenErr))); STATSCOUNTER_INIT(pLstn->ctrSessClose, pLstn->mutCtrSessClose); CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("sessions.closed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->ctrSessClose))); /* the following counters are not protected by mutexes; we accept * that they may not be 100% correct */ pLstn->rcvdBytes = 0, pLstn->rcvdDecompressed = 0; INIT_ATOMIC_HELPER_MUT64(pLstn->mut_rcvdBytes); CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.received"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->rcvdBytes))); CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.decompressed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pLstn->rcvdDecompressed))); CHKiRet(statsobj.ConstructFinalize(pLstn->stats)); CHKiRet(addEPollSock(epolld_lstn, pLstn, sock, &pLstn->epd)); /* add to start of server's listener list */ pLstn->prev = NULL; pLstn->next = pSrv->pLstn; if(pSrv->pLstn != NULL) pSrv->pLstn->prev = pLstn; pSrv->pLstn = pLstn; finalize_it: if(iRet != RS_RET_OK) { if(pLstn != NULL) { if(pLstn->stats != NULL) statsobj.Destruct(&(pLstn->stats)); free(pLstn); } } RETiRet; } /* add a session to the server */ static rsRetVal addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP) { DEFiRet; ptcpsess_t *pSess = NULL; ptcpsrv_t *pSrv = pLstn->pSrv; int pmsg_size_factor; CHKmalloc(pSess = malloc(sizeof(ptcpsess_t))); pSess->next = NULL; if(pLstn->pSrv->inst->startRegex == NULL) { pmsg_size_factor = 1; pSess->pMsg_save = NULL; } else { pmsg_size_factor = 2; pSess->pMsg = NULL; CHKmalloc(pSess->pMsg_save = malloc(1 + iMaxLine * pmsg_size_factor)); } CHKmalloc(pSess->pMsg = malloc(1 + iMaxLine * pmsg_size_factor)); pSess->pLstn = pLstn; pSess->sock = sock; pSess->bSuppOctetFram = pLstn->bSuppOctetFram; pSess->bSPFramingFix = pLstn->bSPFramingFix; pSess->inputState = eAtStrtFram; pSess->iMsg = 0; pSess->iCurrLine = 1; pSess->bzInitDone = 0; pSess->bAtStrtOfFram = 1; pSess->peerName = peerName; pSess->peerIP = peerIP; pSess->compressionMode = pLstn->pSrv->compressionMode; pSess->startRegex = pLstn->pSrv->inst->startRegex; pSess->iAddtlFrameDelim = pLstn->pSrv->iAddtlFrameDelim; /* add to start of server's listener list */ pSess->prev = NULL; pthread_mutex_lock(&pSrv->mutSessLst); int iTCPSessMax = pSrv->inst->iTCPSessMax; if (iTCPSessMax > 0 && pSrv->iTCPSessCnt >= iTCPSessMax) { pthread_mutex_unlock(&pSrv->mutSessLst); LogError(0, RS_RET_MAX_SESS_REACHED, "too many tcp sessions - dropping incoming request"); ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED); } pSrv->iTCPSessCnt++; pSess->next = pSrv->pSess; if(pSrv->pSess != NULL) pSrv->pSess->prev = pSess; pSrv->pSess = pSess; pthread_mutex_unlock(&pSrv->mutSessLst); CHKiRet(addEPollSock(epolld_sess, pSess, sock, &pSess->epd)); finalize_it: if(iRet != RS_RET_OK) { if(pSess != NULL) { if (pSess->next != NULL) { unlinkSess(pSess); } free(pSess->pMsg_save); free(pSess->pMsg); free(pSess); } } RETiRet; } /* finish zlib buffer, to be called before closing the session. */ static rsRetVal doZipFinish(ptcpsess_t *pSess) { int zRet; /* zlib return state */ DEFiRet; unsigned outavail; struct syslogTime stTime; uchar zipBuf[32*1024]; // TODO: use "global" one from pSess if(!pSess->bzInitDone) goto done; pSess->zstrm.avail_in = 0; /* run inflate() on buffer until everything has been compressed */ do { DBGPRINTF("doZipFinish: in inflate() loop, avail_in %d, total_in %ld\n", pSess->zstrm.avail_in, pSess->zstrm.total_in); pSess->zstrm.avail_out = sizeof(zipBuf); pSess->zstrm.next_out = zipBuf; zRet = inflate(&pSess->zstrm, Z_FINISH); /* no bad return value */ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pSess->zstrm.avail_out); outavail = sizeof(zipBuf) - pSess->zstrm.avail_out; if(outavail != 0) { pSess->pLstn->rcvdDecompressed += outavail; CHKiRet(DataRcvdUncompressed(pSess, (char*)zipBuf, outavail, &stTime, 0)); // TODO: query time! } } while (pSess->zstrm.avail_out == 0); finalize_it: zRet = inflateEnd(&pSess->zstrm); if(zRet != Z_OK) { DBGPRINTF("imptcp: error %d returned from zlib/inflateEnd()\n", zRet); } pSess->bzInitDone = 0; done: RETiRet; } /* close/remove a session * NOTE: we do not need to remove the socket from the epoll set, as according * to the epoll man page it is automatically removed on close (Q6). The only * exception is duplicated file handles, which we do not create. */ static rsRetVal closeSess(ptcpsess_t *pSess) { DEFiRet; if(pSess->compressionMode >= COMPRESS_STREAM_ALWAYS) doZipFinish(pSess); const int sock = pSess->sock; close(sock); unlinkSess(pSess); if(pSess->pLstn->pSrv->bEmitMsgOnClose) { LogMsg(0, RS_RET_NO_ERRCODE, LOG_INFO, "imptcp: session on socket %d closed " "with iRet %d.\n", sock, iRet); } STATSCOUNTER_INC(pSess->pLstn->ctrSessClose, pSess->pLstn->mutCtrSessClose); /* unlinked, now remove structure */ destructSess(pSess); DBGPRINTF("imptcp: session on socket %d closed with iRet %d.\n", sock, iRet); RETiRet; } /* create input instance, set default parameters, and * add it to the list of instances. */ static rsRetVal createInstance(instanceConf_t **pinst) { instanceConf_t *inst; DEFiRet; CHKmalloc(inst = malloc(sizeof(instanceConf_t))); inst->next = NULL; inst->pszBindPort = NULL; inst->pszBindAddr = NULL; inst->pszBindPath = NULL; inst->fileUID = -1; inst->fileGID = -1; inst->maxFrameSize = 200000; inst->fCreateMode = 0644; inst->bFailOnPerms = 1; inst->bUnlink = 0; inst->discardTruncatedMsg = 0; inst->flowControl = 1; inst->pszBindRuleset = NULL; inst->pszInputName = NULL; inst->bSuppOctetFram = 1; inst->bSPFramingFix = 0; inst->bKeepAlive = 0; inst->iKeepAliveIntvl = 0; inst->iKeepAliveProbes = 0; inst->iKeepAliveTime = 0; inst->bEmitMsgOnClose = 0; inst->bEmitMsgOnOpen = 0; inst->dfltTZ = NULL; inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; inst->startRegex = NULL; inst->pBindRuleset = NULL; inst->ratelimitBurst = 10000; /* arbitrary high limit */ inst->ratelimitInterval = 0; /* off */ inst->compressionMode = COMPRESS_SINGLE_MSG; inst->multiLine = 0; inst->socketBacklog = 5; inst->pszLstnPortFileName = NULL; inst->iTCPSessMax = -1; /* node created, let's add to config */ if(loadModConf->tail == NULL) { loadModConf->tail = loadModConf->root = inst; } else { loadModConf->tail->next = inst; loadModConf->tail = inst; } *pinst = inst; finalize_it: RETiRet; } /* This function is called when a new listener instace shall be added to * the current config object via the legacy config system. It just shuffles * all parameters to the listener in-memory instance. */ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *const pNewVal) { instanceConf_t *inst; DEFiRet; if(pNewVal == NULL || *pNewVal == '\0') { parser_errmsg("imptcp: port number must be specified, listener ignored"); ABORT_FINALIZE(RS_RET_PARAM_ERROR); } /* if we reach this point, a valid port is given in pNewVal */ CHKiRet(createInstance(&inst)); CHKmalloc(inst->pszBindPort = ustrdup(pNewVal)); if((cs.lstnIP == NULL) || (cs.lstnIP[0] == '\0')) { inst->pszBindAddr = NULL; } else { CHKmalloc(inst->pszBindAddr = ustrdup(cs.lstnIP)); } if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) { inst->pszBindRuleset = NULL; } else { CHKmalloc(inst->pszBindRuleset = ustrdup(cs.pszBindRuleset)); } if((cs.pszInputName == NULL) || (cs.pszInputName[0] == '\0')) { inst->pszInputName = NULL; } else { CHKmalloc(inst->pszInputName = ustrdup(cs.pszInputName)); } inst->pBindRuleset = NULL; inst->bSuppOctetFram = cs.bSuppOctetFram; inst->bKeepAlive = cs.bKeepAlive; inst->iKeepAliveIntvl = cs.iKeepAliveIntvl; inst->iKeepAliveProbes = cs.iKeepAliveProbes; inst->iKeepAliveTime = cs.iKeepAliveTime; inst->bEmitMsgOnClose = cs.bEmitMsgOnClose; inst->bEmitMsgOnOpen = cs.bEmitMsgOnOpen; inst->iAddtlFrameDelim = cs.iAddtlFrameDelim; inst->maxFrameSize = cs.maxFrameSize; inst->iTCPSessMax = cs.iTCPSessMax; finalize_it: free(pNewVal); RETiRet; } static rsRetVal addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) { DEFiRet; ptcpsrv_t *pSrv = NULL; CHKmalloc(pSrv = calloc(1, sizeof(ptcpsrv_t))); pthread_mutex_init(&pSrv->mutSessLst, NULL); pSrv->ratelimiter = NULL; pSrv->pSess = NULL; pSrv->pLstn = NULL; pSrv->inst = inst; pSrv->bSuppOctetFram = inst->bSuppOctetFram; pSrv->bSPFramingFix = inst->bSPFramingFix; pSrv->bKeepAlive = inst->bKeepAlive; pSrv->iKeepAliveIntvl = inst->iKeepAliveIntvl; pSrv->iKeepAliveProbes = inst->iKeepAliveProbes; pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; pSrv->bEmitMsgOnOpen = inst->bEmitMsgOnOpen; pSrv->compressionMode = inst->compressionMode; pSrv->dfltTZ = inst->dfltTZ; if (inst->pszBindPort != NULL) { CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); } pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; pSrv->multiLine = inst->multiLine; pSrv->socketBacklog = inst->socketBacklog; pSrv->pszLstnPortFileName = inst->pszLstnPortFileName; pSrv->maxFrameSize = inst->maxFrameSize; if (inst->pszBindAddr == NULL) { pSrv->lstnIP = NULL; } else { CHKmalloc(pSrv->lstnIP = ustrdup(inst->pszBindAddr)); } if (inst->pszBindPath == NULL) { pSrv->path = NULL; } else { CHKmalloc(pSrv->path = ustrdup(inst->pszBindPath)); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPath)); pSrv->bUnixSocket = 1; pSrv->fCreateMode = inst->fCreateMode; pSrv->fileUID = inst->fileUID; pSrv->fileGID = inst->fileGID; pSrv->bFailOnPerms = inst->bFailOnPerms; } pSrv->bUnlink = inst->bUnlink; pSrv->discardTruncatedMsg = inst->discardTruncatedMsg; pSrv->flowControl = inst->flowControl; pSrv->pRuleset = inst->pBindRuleset; pSrv->pszInputName = ustrdup((inst->pszInputName == NULL) ? UCHAR_CONSTANT("imptcp") : inst->pszInputName); pSrv->iTCPSessMax = inst->iTCPSessMax; CHKiRet(prop.Construct(&pSrv->pInputName)); CHKiRet(prop.SetString(pSrv->pInputName, pSrv->pszInputName, ustrlen(pSrv->pszInputName))); CHKiRet(prop.ConstructFinalize(pSrv->pInputName)); CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imptcp", (char*) pSrv->port)); ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst); ratelimitSetThreadSafe(pSrv->ratelimiter); /* add to linked list */ pSrv->pNext = pSrvRoot; pSrvRoot = pSrv; /* all config vars are auto-reset -- this also is very useful with the * new config format effort (v6). */ resetConfigVariables(NULL, NULL); finalize_it: if(iRet != RS_RET_OK) { LogError(0, NO_ERRCODE, "error %d trying to add listener", iRet); if(pSrv != NULL) { destructSrv(pSrv); } } RETiRet; } /* destroy worker pool structures and wait for workers to terminate */ static void startWorkerPool(void) { int i; pthread_mutex_lock(&io_q.mut); /* locking to keep Coverity happy */ wrkrRunning = 0; pthread_mutex_unlock(&io_q.mut); DBGPRINTF("imptcp: starting worker pool, %d workers\n", runModConf->wrkrMax); wrkrInfo = calloc(runModConf->wrkrMax, sizeof(struct wrkrInfo_s)); if (wrkrInfo == NULL) { LogError(errno, RS_RET_OUT_OF_MEMORY, "imptcp: worker-info array allocation failed."); return; } for(i = 0 ; i < runModConf->wrkrMax ; ++i) { /* init worker info structure! */ wrkrInfo[i].wrkrIdx = i; wrkrInfo[i].numCalled = 0; pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i])); } } /* destroy worker pool structures and wait for workers to terminate */ static void stopWorkerPool(void) { int i; DBGPRINTF("imptcp: stopping worker pool\n"); pthread_mutex_lock(&io_q.mut); pthread_cond_broadcast(&io_q.wakeup_worker); /* awake wrkr if not running */ pthread_mutex_unlock(&io_q.mut); for(i = 0 ; i < runModConf->wrkrMax ; ++i) { pthread_join(wrkrInfo[i].tid, NULL); DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); } free(wrkrInfo); } /* start up all listeners * This is a one-time stop once the module is set to start. */ static rsRetVal startupServers(void) { DEFiRet; rsRetVal localRet, lastErr; int iOK; int iAll; ptcpsrv_t *pSrv; iAll = iOK = 0; lastErr = RS_RET_ERR; pSrv = pSrvRoot; while(pSrv != NULL) { DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); localRet = startupSrv(pSrv); if(localRet == RS_RET_OK) iOK++; else lastErr = localRet; ++iAll; pSrv = pSrv->pNext; } DBGPRINTF("imptcp: %d out of %d servers started successfully\n", iOK, iAll); if(iOK == 0) /* iff all fails, we report an error */ iRet = lastErr; RETiRet; } /* process new activity on listener. This means we need to accept a new * connection. */ static rsRetVal ATTR_NONNULL() lstnActivity(ptcplstn_t *const pLstn) { int newSock = -1; prop_t *peerName; prop_t *peerIP; rsRetVal localRet; DEFiRet; DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock); while(glbl.GetGlobalInputTermState() == 0) { localRet = AcceptConnReq(pLstn, &newSock, &peerName, &peerIP); DBGPRINTF("imptcp: AcceptConnReq on listen socket %d returned %d\n", pLstn->sock, localRet); if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1) { break; } CHKiRet(localRet); localRet = addSess(pLstn, newSock, peerName, peerIP); if(localRet != RS_RET_OK) { close(newSock); prop.Destruct(&peerName); prop.Destruct(&peerIP); ABORT_FINALIZE(localRet); } } finalize_it: RETiRet; } /* process new activity on session. This means we need to accept data * or close the session. */ static rsRetVal sessActivity(ptcpsess_t *const pSess, int *const continue_polling) { int lenRcv; int lenBuf; uchar *peerName; int lenPeer; int remsock = 0; /* init just to keep compiler happy... :-( */ sbool bEmitOnClose = 0; char rcvBuf[128*1024]; int runs = 0; DEFiRet; DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock); while(runs++ < 16) { lenBuf = sizeof(rcvBuf); lenRcv = recv(pSess->sock, rcvBuf, lenBuf, 0); if(lenRcv > 0) { /* have data, process it */ DBGPRINTF("imptcp: data(%d) on socket %d: %s\n", lenBuf, pSess->sock, rcvBuf); CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv)); } else if (lenRcv == 0) { /* session was closed, do clean-up */ if(pSess->pLstn->pSrv->bEmitMsgOnClose) { prop.GetString(pSess->peerName, &peerName, &lenPeer), remsock = pSess->sock; bEmitOnClose = 1; } *continue_polling = 0; if(bEmitOnClose) { LogError(0, RS_RET_PEER_CLOSED_CONN, "imptcp session %d closed by " "remote peer %s.", remsock, peerName); } CHKiRet(closeSess(pSess)); /* close may emit more messages in strmzip mode! */ break; } else { if(CHK_EAGAIN_EWOULDBLOCK) break; DBGPRINTF("imptcp: error on session socket %d - closed.\n", pSess->sock); *continue_polling = 0; closeSess(pSess); /* try clean-up by dropping session */ break; } } finalize_it: RETiRet; } /* This function is called to process a single request. This may * be carried out by the main worker or a helper. It can be run * concurrently. */ static void processWorkItem(epolld_t *epd) { int continue_polling = 1; switch(epd->typ) { case epolld_lstn: /* listener never stops polling (except server shutdown) */ lstnActivity((ptcplstn_t *) epd->ptr); break; case epolld_sess: sessActivity((ptcpsess_t *) epd->ptr, &continue_polling); break; default: LogError(0, RS_RET_INTERNAL_ERROR, "error: invalid epolld_type_t %d after epoll", epd->typ); break; } if (continue_polling == 1) { epoll_ctl(epollfd, EPOLL_CTL_MOD, epd->sock, &(epd->ev)); } } static rsRetVal initIoQ(void) { DEFiRet; CHKiConcCtrl(pthread_mutex_init(&io_q.mut, NULL)); CHKiConcCtrl(pthread_cond_init(&io_q.wakeup_worker, NULL)); STAILQ_INIT(&io_q.q); io_q.sz = 0; io_q.ctrMaxSz = 0; /* TODO: discuss this and fix potential concurrent read/write issues */ CHKiRet(statsobj.Construct(&io_q.stats)); CHKiRet(statsobj.SetName(io_q.stats, (uchar*) "io-work-q")); CHKiRet(statsobj.SetOrigin(io_q.stats, (uchar*) "imptcp")); STATSCOUNTER_INIT(io_q.ctrEnq, io_q.mutCtrEnq); CHKiRet(statsobj.AddCounter(io_q.stats, UCHAR_CONSTANT("enqueued"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &io_q.ctrEnq)); CHKiRet(statsobj.AddCounter(io_q.stats, UCHAR_CONSTANT("maxqsize"), ctrType_Int, CTR_FLAG_NONE, &io_q.ctrMaxSz)); CHKiRet(statsobj.ConstructFinalize(io_q.stats)); finalize_it: RETiRet; } static void destroyIoQ(void) { io_req_t *n; if (io_q.stats != NULL) { statsobj.Destruct(&io_q.stats); } pthread_mutex_lock(&io_q.mut); while (!STAILQ_EMPTY(&io_q.q)) { n = STAILQ_FIRST(&io_q.q); STAILQ_REMOVE_HEAD(&io_q.q, link); LogError(0, RS_RET_INTERNAL_ERROR, "imptcp: discarded enqueued io-work to allow shutdown " "- ignored"); free(n); } io_q.sz = 0; pthread_mutex_unlock(&io_q.mut); pthread_cond_destroy(&io_q.wakeup_worker); pthread_mutex_destroy(&io_q.mut); } static rsRetVal enqueueIoWork(epolld_t *epd, int dispatchInlineIfQueueFull) { io_req_t *n; int dispatchInline; int inlineDispatchThreshold; DEFiRet; CHKmalloc(n = malloc(sizeof(io_req_t))); n->epd = epd; inlineDispatchThreshold = DFLT_inlineDispatchThreshold * runModConf->wrkrMax; dispatchInline = 0; pthread_mutex_lock(&io_q.mut); if (dispatchInlineIfQueueFull && io_q.sz > inlineDispatchThreshold) { dispatchInline = 1; } else { STAILQ_INSERT_TAIL(&io_q.q, n, link); io_q.sz++; STATSCOUNTER_INC(io_q.ctrEnq, io_q.mutCtrEnq); STATSCOUNTER_SETMAX_NOMUT(io_q.ctrMaxSz, io_q.sz); pthread_cond_signal(&io_q.wakeup_worker); } pthread_mutex_unlock(&io_q.mut); if (dispatchInline == 1) { free(n); processWorkItem(epd); } finalize_it: if (iRet != RS_RET_OK) { if (n == NULL) { LogError(0, iRet, "imptcp: couldn't allocate memory to enqueue io-request - ignored"); } } RETiRet; } /* This function is called to process a complete workset, that * is a set of events returned from epoll. */ static void processWorkSet(int nEvents, struct epoll_event events[]) { int iEvt; int remainEvents; remainEvents = nEvents; epolld_t *epd; for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) { epd = (epolld_t*)events[iEvt].data.ptr; if(runModConf->bProcessOnPoller && remainEvents == 1) { /* process self, save context switch */ processWorkItem(epd); } else { enqueueIoWork(epd, runModConf->bProcessOnPoller); } --remainEvents; } } /* worker to process incoming requests */ static void * wrkr(void *myself) { struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; pthread_mutex_lock(&io_q.mut); ++wrkrRunning; pthread_mutex_unlock(&io_q.mut); uchar thrdName[32]; snprintf((char*)thrdName, sizeof(thrdName), "imptcp/w%d", me->wrkrIdx); # if defined(HAVE_PRCTL) && defined(PR_SET_NAME) /* set thread name - we ignore if the call fails, has no harsh consequences... */ if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) { DBGPRINTF("prctl failed, not setting thread name for '%s'\n", thrdName); } # endif io_req_t *n; while(1) { n = NULL; pthread_mutex_lock(&io_q.mut); if (io_q.sz == 0) { --wrkrRunning; if (glbl.GetGlobalInputTermState() != 0) { pthread_mutex_unlock(&io_q.mut); break; } else { DBGPRINTF("imptcp: worker %llu waiting on new work items\n", (unsigned long long) me->tid); pthread_cond_wait(&io_q.wakeup_worker, &io_q.mut); DBGPRINTF("imptcp: worker %llu awoken\n", (unsigned long long) me->tid); } ++wrkrRunning; } if (io_q.sz > 0) { n = STAILQ_FIRST(&io_q.q); STAILQ_REMOVE_HEAD(&io_q.q, link); io_q.sz--; } pthread_mutex_unlock(&io_q.mut); if (n != NULL) { ++me->numCalled; processWorkItem(n->epd); free(n); } } return NULL; } BEGINnewInpInst struct cnfparamvals *pvals; instanceConf_t *inst; char *cstr; int i; CODESTARTnewInpInst DBGPRINTF("newInpInst (imptcp)\n"); if((pvals = nvlstGetParams(lst, &inppblk, NULL)) == NULL) { ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } if(Debug) { dbgprintf("input param blk in imptcp:\n"); cnfparamsPrint(&inppblk, pvals); } CHKiRet(createInstance(&inst)); for(i = 0 ; i < inppblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; if(!strcmp(inppblk.descr[i].name, "port")) { inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "address")) { inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "path")) { inst->pszBindPath = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "unlink")) { inst->bUnlink = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "discardtruncatedmsg")) { inst->discardTruncatedMsg = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "flowcontrol")) { inst->flowControl = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "fileowner")) { inst->fileUID = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "fileownernum")) { inst->fileUID = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "filegroup")) { inst->fileGID = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "filegroupnum")) { inst->fileGID = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "filecreatemode")) { inst->fCreateMode = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "failonpermsfailure")) { inst->bFailOnPerms = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "name")) { inst->pszInputName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "maxframesize")) { const int max = (int) pvals[i].val.d.n; if(max <= 200000000) { inst->maxFrameSize = max; } else { parser_errmsg("imptcp: invalid value for 'maxFrameSize' " "parameter given is %d, max is 200000000", max); ABORT_FINALIZE(RS_RET_PARAM_ERROR); } } else if(!strcmp(inppblk.descr[i].name, "framing.delimiter.regex")) { inst->startRegex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "supportoctetcountedframing")) { inst->bSuppOctetFram = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "framingfix.cisco.asa")) { inst->bSPFramingFix = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "compression.mode")) { cstr = es_str2cstr(pvals[i].val.d.estr, NULL); if(!strcasecmp(cstr, "stream:always")) { inst->compressionMode = COMPRESS_STREAM_ALWAYS; } else if(!strcasecmp(cstr, "none")) { inst->compressionMode = COMPRESS_NEVER; } else { parser_errmsg("imptcp: invalid value for 'compression.mode' " "parameter (given is '%s')", cstr); free(cstr); ABORT_FINALIZE(RS_RET_PARAM_ERROR); } free(cstr); } else if(!strcmp(inppblk.descr[i].name, "maxsessions")) { inst->iTCPSessMax = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "keepalive")) { inst->bKeepAlive = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "keepalive.probes")) { inst->iKeepAliveProbes = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "keepalive.time")) { inst->iKeepAliveTime = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "keepalive.interval")) { inst->iKeepAliveIntvl = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "addtlframedelimiter")) { inst->iAddtlFrameDelim = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) { inst->bEmitMsgOnClose = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionopen")) { inst->bEmitMsgOnOpen = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "defaulttz")) { inst->dfltTZ = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) { inst->ratelimitBurst = (unsigned int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) { inst->ratelimitInterval = (unsigned int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "multiline")) { inst->multiLine = (sbool) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "listenportfilename")) { inst->pszLstnPortFileName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "socketbacklog")) { inst->socketBacklog = (int) pvals[i].val.d.n; } else { dbgprintf("imptcp: program error, non-handled " "param '%s'\n", inppblk.descr[i].name); } char *bindPort = (char *) inst->pszBindPort; char *bindPath = (char *) inst->pszBindPath; if ((bindPort == NULL || strlen(bindPort) < 1) && (bindPath == NULL || strlen (bindPath) < 1)) { parser_errmsg("imptcp: Must have either port or path defined"); ABORT_FINALIZE(RS_RET_PARAM_ERROR); } } if(inst->startRegex != NULL) { const int errcode = regcomp(&inst->start_preg, (char*)inst->startRegex, REG_EXTENDED); if(errcode != 0) { char errbuff[512]; regerror(errcode, &inst->start_preg, errbuff, sizeof(errbuff)); parser_errmsg("imptcp: error in framing.delimiter.regex expansion: %s", errbuff); ABORT_FINALIZE(RS_RET_ERR); } } if (inst->iTCPSessMax == -1) { inst->iTCPSessMax = loadModConf->iTCPSessMax; } finalize_it: CODE_STD_FINALIZERnewInpInst cnfparamvalsDestruct(pvals, &inppblk); ENDnewInpInst BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; pModConf->pConf = pConf; /* init our settings */ loadModConf->wrkrMax = DFLT_wrkrMax; loadModConf->bProcessOnPoller = 1; loadModConf->configSetViaV2Method = 0; bLegacyCnfModGlobalsPermitted = 1; /* init legacy config vars */ initConfigSettings(); ENDbeginCnfLoad BEGINsetModCnf struct cnfparamvals *pvals = NULL; int i; CODESTARTsetModCnf pvals = nvlstGetParams(lst, &modpblk, NULL); if(pvals == NULL) { LogError(0, RS_RET_MISSING_CNFPARAMS, "imptcp: error processing module " "config parameters [module(...)]"); ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } if(Debug) { dbgprintf("module (global) param blk for imptcp:\n"); cnfparamsPrint(&modpblk, pvals); } for(i = 0 ; i < modpblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; if(!strcmp(modpblk.descr[i].name, "threads")) { loadModConf->wrkrMax = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "maxsessions")) { loadModConf->iTCPSessMax = (int) pvals[i].val.d.n; } else if(!strcmp(modpblk.descr[i].name, "processOnPoller")) { loadModConf->bProcessOnPoller = (int) pvals[i].val.d.n; } else { dbgprintf("imptcp: program error, non-handled " "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); } } /* remove all of our legacy handlers, as they can not used in addition * the the new-style config method. */ bLegacyCnfModGlobalsPermitted = 0; loadModConf->configSetViaV2Method = 1; finalize_it: if(pvals != NULL) cnfparamvalsDestruct(pvals, &modpblk); ENDsetModCnf BEGINendCnfLoad CODESTARTendCnfLoad if(!loadModConf->configSetViaV2Method) { /* persist module-specific settings from legacy config system */ loadModConf->wrkrMax = cs.wrkrMax; } loadModConf = NULL; /* done loading */ /* free legacy config vars */ free(cs.pszInputName); free(cs.lstnIP); cs.pszInputName = NULL; cs.lstnIP = NULL; ENDendCnfLoad /* function to generate error message if framework does not find requested ruleset */ static inline void std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst) { LogError(0, NO_ERRCODE, "imptcp: ruleset '%s' for port %s not found - " "using default ruleset instead", inst->pszBindRuleset, inst->pszBindPort); } BEGINcheckCnf instanceConf_t *inst; CODESTARTcheckCnf for(inst = pModConf->root ; inst != NULL ; inst = inst->next) { std_checkRuleset(pModConf, inst); } ENDcheckCnf BEGINactivateCnfPrePrivDrop instanceConf_t *inst; CODESTARTactivateCnfPrePrivDrop iMaxLine = glbl.GetMaxLine(runConf); /* get maximum size we currently support */ DBGPRINTF("imptcp: config params iMaxLine %d\n", iMaxLine); runModConf = pModConf; for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { addListner(pModConf, inst); } if(pSrvRoot == NULL) { LogError(0, RS_RET_NO_LSTN_DEFINED, "imptcp: no ptcp server defined, module can not run."); ABORT_FINALIZE(RS_RET_NO_RUN); } # if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) DBGPRINTF("imptcp uses epoll_create1()\n"); epollfd = epoll_create1(EPOLL_CLOEXEC); if(epollfd < 0 && errno == ENOSYS) # endif { DBGPRINTF("imptcp uses epoll_create()\n"); /* reading the docs, the number of epoll events passed to * epoll_create() seems not to be used at all in kernels. So * we just provide "a" number, happens to be 10. */ epollfd = epoll_create(10); } if(epollfd < 0) { LogError(0, RS_RET_EPOLL_CR_FAILED, "error: epoll_create() failed"); ABORT_FINALIZE(RS_RET_NO_RUN); } /* start up servers, but do not yet read input data */ CHKiRet(startupServers()); DBGPRINTF("imptcp started up, but not yet receiving data\n"); finalize_it: ENDactivateCnfPrePrivDrop BEGINactivateCnf CODESTARTactivateCnf /* nothing to do, all done pre priv drop */ ENDactivateCnf BEGINfreeCnf instanceConf_t *inst, *del; CODESTARTfreeCnf for(inst = pModConf->root ; inst != NULL ; ) { free(inst->pszBindPort); free(inst->pszBindPath); free(inst->pszBindAddr); free(inst->pszBindRuleset); free(inst->pszInputName); free(inst->dfltTZ); if(inst->startRegex != NULL) { regfree(&inst->start_preg); free(inst->startRegex); } del = inst; inst = inst->next; free(del); } ENDfreeCnf /* This function is called to gather input. */ BEGINrunInput int nEvents; struct epoll_event events[128]; CODESTARTrunInput initIoQ(); startWorkerPool(); DBGPRINTF("imptcp: now beginning to process input data\n"); while(glbl.GetGlobalInputTermState() == 0) { DBGPRINTF("imptcp going on epoll_wait\n"); nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1); DBGPRINTF("imptcp: epoll returned %d events\n", nEvents); processWorkSet(nEvents, events); } DBGPRINTF("imptcp: successfully terminated\n"); /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */ ENDrunInput /* initialize and return if will run or not */ BEGINwillRun CODESTARTwillRun ENDwillRun /* completely shut down a server, that means closing all of its * listeners and sessions. */ static void shutdownSrv(ptcpsrv_t *pSrv) { ptcplstn_t *pLstn, *lstnDel; ptcpsess_t *pSess, *sessDel; /* listeners */ pLstn = pSrv->pLstn; while(pLstn != NULL) { close(pLstn->sock); statsobj.Destruct(&(pLstn->stats)); /* now unlink listner */ lstnDel = pLstn; pLstn = pLstn->next; DBGPRINTF("imptcp shutdown listen socket %d (rcvd %lld bytes, " "decompressed %lld)\n", lstnDel->sock, lstnDel->rcvdBytes, lstnDel->rcvdDecompressed); free(lstnDel->epd); free(lstnDel); } if (pSrv->bUnixSocket && pSrv->bUnlink) { unlink((char*) pSrv->path); } /* sessions */ pSess = pSrv->pSess; while(pSess != NULL) { close(pSess->sock); sessDel = pSess; pSess = pSess->next; DBGPRINTF("imptcp shutdown session socket %d\n", sessDel->sock); destructSess(sessDel); } } BEGINafterRun ptcpsrv_t *pSrv, *srvDel; CODESTARTafterRun stopWorkerPool(); destroyIoQ(); /* we need to close everything that is still open */ pSrv = pSrvRoot; while(pSrv != NULL) { srvDel = pSrv; pSrv = pSrv->pNext; shutdownSrv(srvDel); destructSrv(srvDel); } close(epollfd); ENDafterRun BEGINmodExit CODESTARTmodExit pthread_attr_destroy(&wrkrThrdAttr); /* release objects we used */ objRelease(glbl, CORE_COMPONENT); objRelease(statsobj, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); objRelease(datetime, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); ENDmodExit static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { cs.bEmitMsgOnClose = 0; cs.bEmitMsgOnOpen = 0; cs.wrkrMax = DFLT_wrkrMax; cs.bKeepAlive = 0; cs.iKeepAliveProbes = 0; cs.iKeepAliveTime = 0; cs.iKeepAliveIntvl = 0; cs.bSuppOctetFram = 1; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; cs.maxFrameSize = 200000; free(cs.pszInputName); cs.pszInputName = NULL; free(cs.lstnIP); cs.lstnIP = NULL; return RS_RET_OK; } BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURENonCancelInputTermination) iRet = RS_RET_OK; ENDisCompatibleWithFeature BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); /* initialize "read-only" thread attributes */ pthread_attr_init(&wrkrThrdAttr); pthread_attr_setstacksize(&wrkrThrdAttr, 4096*1024); /* init legacy config settings */ initConfigSettings(); /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord, addInstance, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive"), 0, eCmdHdlrBinary, NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_probes"), 0, eCmdHdlrInt, NULL, &cs.iKeepAliveProbes, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_time"), 0, eCmdHdlrInt, NULL, &cs.iKeepAliveTime, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_intvl"), 0, eCmdHdlrInt, NULL, &cs.iKeepAliveIntvl, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserversupportoctetcountedframing"), 0, eCmdHdlrBinary, NULL, &cs.bSuppOctetFram, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0, eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0, eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0, eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverbindruleset"), 0, eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); /* module-global parameters */ CHKiRet(regCfSysLineHdlr2(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt, NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit /* vim:set ai: */