diff options
Diffstat (limited to 'threads.c')
-rw-r--r-- | threads.c | 322 |
1 files changed, 322 insertions, 0 deletions
diff --git a/threads.c b/threads.c new file mode 100644 index 0000000..2fd4bbb --- /dev/null +++ b/threads.c @@ -0,0 +1,322 @@ +/* threads.c + * + * This file implements threading support helpers (and maybe the thread object) + * for rsyslog. + * + * File begun on 2007-12-14 by RGerhards + * + * Copyright 2007-2016 Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" + +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <errno.h> +#include <pthread.h> +#include <assert.h> +#ifdef HAVE_SYS_PRCTL_H +# include <sys/prctl.h> +#endif + +#include "rsyslog.h" +#include "dirty.h" +#include "linkedlist.h" +#include "threads.h" +#include "srUtils.h" +#include "errmsg.h" +#include "glbl.h" +#include "unicode-helper.h" +#include "rsconf.h" + +/* linked list of currently-known threads */ +static linkedList_t llThrds; + +/* methods */ + +/* Construct a new thread object + */ +static rsRetVal +thrdConstruct(thrdInfo_t **ppThis) +{ + DEFiRet; + thrdInfo_t *pThis; + + assert(ppThis != NULL); + + CHKmalloc(pThis = calloc(1, sizeof(thrdInfo_t))); + pthread_mutex_init(&pThis->mutThrd, NULL); + pthread_cond_init(&pThis->condThrdTerm, NULL); + *ppThis = pThis; + +finalize_it: + RETiRet; +} + + +/* Destructs a thread object. The object must not be linked to the + * linked list of threads. Please note that the thread should have been + * stopped before. If not, we try to do it. + */ +static rsRetVal thrdDestruct(thrdInfo_t *pThis) +{ + DEFiRet; + assert(pThis != NULL); + + pthread_mutex_lock(&pThis->mutThrd); + if(pThis->bIsActive == 1) { + pthread_mutex_unlock(&pThis->mutThrd); + thrdTerminate(pThis); + } else { + pthread_mutex_unlock(&pThis->mutThrd); + pthread_join(pThis->thrdID, NULL); + } + + /* call cleanup function, if any */ + if(pThis->pAfterRun != NULL) + pThis->pAfterRun(pThis); + + pthread_mutex_destroy(&pThis->mutThrd); + pthread_cond_destroy(&pThis->condThrdTerm); + free(pThis->name); + free(pThis); + + RETiRet; +} + + +/* terminate a thread via the non-cancel interface + * This is a separate function as it involves a bit more of code. + * rgerhads, 2009-10-15 + */ +static rsRetVal +thrdTerminateNonCancel(thrdInfo_t *pThis) +{ + struct timespec tTimeout; + int ret; + int was_active; + DEFiRet; + assert(pThis != NULL); + + DBGPRINTF("request term via SIGTTIN for input thread '%s' %p\n", + pThis->name, (void*) pThis->thrdID); + + pThis->bShallStop = RSTRUE; + d_pthread_mutex_lock(&pThis->mutThrd); + timeoutComp(&tTimeout, runConf->globals.inputTimeoutShutdown); + was_active = pThis->bIsActive; + while(was_active) { + if(dbgTimeoutToStderr) { + fprintf(stderr, "rsyslogd debug: info: trying to cooperatively stop " + "input %s, timeout %d ms\n", pThis->name, runConf->globals.inputTimeoutShutdown); + } + DBGPRINTF("thread %s: initiating termination, timeout %d ms\n", + pThis->name, runConf->globals.inputTimeoutShutdown); + const int r = pthread_kill(pThis->thrdID, SIGTTIN); + if(r != 0) { + LogError(r, RS_RET_INTERNAL_ERROR, "error terminating thread %s " + "this may cause shutdown issues", pThis->name); + } + ret = d_pthread_cond_timedwait(&pThis->condThrdTerm, &pThis->mutThrd, &tTimeout); + if(ret == ETIMEDOUT) { + DBGPRINTF("input thread term: timeout expired waiting on thread %s " + "termination - canceling\n", pThis->name); + if(dbgTimeoutToStderr) { + fprintf(stderr, "rsyslogd debug: input thread term: " + "timeout expired waiting on thread %s " + "termination - canceling\n", pThis->name); + } + pthread_cancel(pThis->thrdID); + break; + } else if(ret != 0) { + char errStr[1024]; + int err = ret; + rs_strerror_r(err, errStr, sizeof(errStr)); + DBGPRINTF("input thread term: cond_wait returned with error %d: %s\n", + err, errStr); + } + was_active = pThis->bIsActive; + } + d_pthread_mutex_unlock(&pThis->mutThrd); + + if(was_active) { + DBGPRINTF("non-cancel input thread termination FAILED for thread %s %p\n", + pThis->name, (void*) pThis->thrdID); + } else { + DBGPRINTF("non-cancel input thread termination succeeded for thread %s %p\n", + pThis->name, (void*) pThis->thrdID); + } + + RETiRet; +} + + +/* terminate a thread gracefully. + */ +rsRetVal thrdTerminate(thrdInfo_t *pThis) +{ + DEFiRet; + assert(pThis != NULL); + + if(pThis->bNeedsCancel) { + DBGPRINTF("request term via canceling for input thread %s\n", pThis->name); + if(dbgTimeoutToStderr) { + fprintf(stderr, "rsyslogd debug: request term via canceling for " + "input thread %s\n", pThis->name); + } + pthread_cancel(pThis->thrdID); + } else { + thrdTerminateNonCancel(pThis); + } + pthread_join(pThis->thrdID, NULL); /* wait for input thread to complete */ + + RETiRet; +} + + +/* terminate all known threads gracefully. + */ +rsRetVal thrdTerminateAll(void) +{ + DEFiRet; + llDestroy(&llThrds); + RETiRet; +} + + +/* This is an internal wrapper around the user thread function. Its + * purpose is to handle all the necessary housekeeping stuff so that the + * user function needs not to be aware of the threading calls. The user + * function call has just "normal", non-threading semantics. + * rgerhards, 2007-12-17 + */ +static ATTR_NORETURN void* +thrdStarter(void *const arg) +{ + DEFiRet; + thrdInfo_t *const pThis = (thrdInfo_t*) arg; +# if defined(HAVE_PRCTL) && defined(PR_SET_NAME) + uchar thrdName[32] = "in:"; +# endif + + assert(pThis != NULL); + assert(pThis->pUsrThrdMain != NULL); + +# if defined(HAVE_PRCTL) && defined(PR_SET_NAME) + ustrncpy(thrdName+3, pThis->name, 20); + dbgOutputTID((char*)thrdName); + + /* set thread name - we ignore if the call fails, has no harsh consequences... */ + if(prctl(PR_SET_NAME, thrdName, 0, 0, 0) != 0) { + DBGPRINTF("prctl failed, not setting thread name for '%s'\n", pThis->name); + } else { + DBGPRINTF("set thread name to '%s'\n", thrdName); + } +# endif + + /* block all signals except SIGTTIN and SIGSEGV */ + sigset_t sigSet; + sigfillset(&sigSet); + sigdelset(&sigSet, SIGTTIN); + sigdelset(&sigSet, SIGSEGV); + pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + + /* setup complete, we are now ready to execute the user code. We will not + * regain control until the user code is finished, in which case we terminate + * the thread. + */ + iRet = pThis->pUsrThrdMain(pThis); + + if(iRet == RS_RET_OK) { + dbgprintf("thrdStarter: usrThrdMain %s - 0x%lx returned with iRet %d, exiting now.\n", + pThis->name, (unsigned long) pThis->thrdID, iRet); + } else { + LogError(0, iRet, "main thread of %s terminated abnormally", pThis->name); + } + + /* signal master control that we exit (we do the mutex lock mostly to + * keep the thread debugger happer, it would not really be necessary with + * the logic we employ...) + */ + d_pthread_mutex_lock(&pThis->mutThrd); + pThis->bIsActive = 0; + pthread_cond_signal(&pThis->condThrdTerm); + d_pthread_mutex_unlock(&pThis->mutThrd); + + pthread_exit(0); +} +/* Start a new thread and add it to the list of currently + * executing threads. It is added at the end of the list. + * rgerhards, 2007-12-14 + */ +rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), rsRetVal(*afterRun)(thrdInfo_t *), + sbool bNeedsCancel, uchar *name) +{ + DEFiRet; + thrdInfo_t *pThis; +#if defined (_AIX) + pthread_attr_t aix_attr; +#endif + + assert(thrdMain != NULL); + + CHKiRet(thrdConstruct(&pThis)); + pThis->bIsActive = 1; + pThis->pUsrThrdMain = thrdMain; + pThis->pAfterRun = afterRun; + pThis->bNeedsCancel = bNeedsCancel; + pThis->name = ustrdup(name); +#if defined (_AIX) + pthread_attr_init(&aix_attr); + pthread_attr_setstacksize(&aix_attr, 4096*512); + pthread_create(&pThis->thrdID, &aix_attr, thrdStarter, pThis); +#else + pthread_create(&pThis->thrdID, &default_thread_attr, thrdStarter, pThis); +#endif + CHKiRet(llAppend(&llThrds, NULL, pThis)); + +finalize_it: + RETiRet; +} + + +/* initialize the thread-support subsystem + * must be called once at the start of the program + */ +rsRetVal thrdInit(void) +{ + DEFiRet; + iRet = llInit(&llThrds, thrdDestruct, NULL, NULL); + RETiRet; +} + + +/* de-initialize the thread subsystem + * must be called once at the end of the program + */ +rsRetVal thrdExit(void) +{ + DEFiRet; + iRet = llDestroy(&llThrds); + RETiRet; +} + + +/* vi:set ai: + */ |