diff options
Diffstat (limited to 'src/libnetdata/threads/threads.c')
-rw-r--r-- | src/libnetdata/threads/threads.c | 437 |
1 files changed, 437 insertions, 0 deletions
diff --git a/src/libnetdata/threads/threads.c b/src/libnetdata/threads/threads.c new file mode 100644 index 00000000..0e12d173 --- /dev/null +++ b/src/libnetdata/threads/threads.c @@ -0,0 +1,437 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "../libnetdata.h" + +#define nd_thread_status_get(nti) __atomic_load_n(&((nti)->options), __ATOMIC_ACQUIRE) +#define nd_thread_status_check(nti, flag) (__atomic_load_n(&((nti)->options), __ATOMIC_ACQUIRE) & (flag)) +#define nd_thread_status_set(nti, flag) __atomic_or_fetch(&((nti)->options), flag, __ATOMIC_RELEASE) +#define nd_thread_status_clear(nti, flag) __atomic_and_fetch(&((nti)->options), ~(flag), __ATOMIC_RELEASE) + +typedef void (*nd_thread_canceller)(void *data); + +struct nd_thread { + void *arg; + pid_t tid; + char tag[ND_THREAD_TAG_MAX + 1]; + void *ret; // the return value of start routine + void *(*start_routine) (void *); + NETDATA_THREAD_OPTIONS options; + pthread_t thread; + bool cancel_atomic; + +#ifdef NETDATA_INTERNAL_CHECKS + // keep track of the locks currently held + // used to detect locks that are left locked during exit + int rwlocks_read_locks; + int rwlocks_write_locks; + int mutex_locks; + int spinlock_locks; + int rwspinlock_read_locks; + int rwspinlock_write_locks; +#endif + + struct { + SPINLOCK spinlock; + nd_thread_canceller cb; + void *data; + } canceller; + + struct nd_thread *prev, *next; +}; + +static struct { + struct { + SPINLOCK spinlock; + ND_THREAD *list; + } exited; + + struct { + SPINLOCK spinlock; + ND_THREAD *list; + } running; + + pthread_attr_t *attr; +} threads_globals = { + .exited = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .list = NULL, + }, + .running = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .list = NULL, + }, + .attr = NULL, +}; + +static __thread ND_THREAD *_nd_thread_info = NULL; +static __thread char _nd_thread_os_name[ND_THREAD_TAG_MAX + 1] = ""; + +// -------------------------------------------------------------------------------------------------------------------- +// O/S abstraction + +// get the thread name from the operating system +static inline void os_get_thread_name(char *out, size_t size) { +#if defined(__FreeBSD__) + pthread_get_name_np(pthread_self(), out, size); + if(strcmp(_nd_thread_os_name, "netdata") == 0) + strncpyz(out, "MAIN", size - 1); +#elif defined(HAVE_PTHREAD_GETNAME_NP) + pthread_getname_np(pthread_self(), out, size - 1); + if(strcmp(out, "netdata") == 0) + strncpyz(out, "MAIN", size - 1); +#else + strncpyz(out, "MAIN", size - 1); +#endif +} + +// set the thread name to the operating system +static inline void os_set_thread_name(const char *name) { +#if defined(__FreeBSD__) + pthread_set_name_np(pthread_self(), name); +#elif defined(__APPLE__) + pthread_setname_np(name); +#else + pthread_setname_np(pthread_self(), name); +#endif +} + +// -------------------------------------------------------------------------------------------------------------------- +// internal API for managing names + +inline int nd_thread_has_tag(void) { + return (_nd_thread_info && _nd_thread_info->tag[0]); +} + +// For threads created by netdata, return the tag of the thread. +// For threads created by others (libuv, webrtc, etc), return the tag of the operating system. +// This caches the response, so that it won't query the operating system multiple times. +static inline const char *nd_thread_get_name(bool recheck) { + if(nd_thread_has_tag()) + return _nd_thread_info->tag; + + if(!recheck && _nd_thread_os_name[0]) + return _nd_thread_os_name; + + os_get_thread_name(_nd_thread_os_name, sizeof(_nd_thread_os_name)); + + return _nd_thread_os_name; +} + +const char *nd_thread_tag(void) { + return nd_thread_get_name(false); +} + +void nd_thread_tag_set(const char *tag) { + if(!tag || !*tag) return; + + if(_nd_thread_info) + strncpyz(_nd_thread_info->tag, tag, sizeof(_nd_thread_info->tag) - 1); + + strncpyz(_nd_thread_os_name, tag, sizeof(_nd_thread_os_name) - 1); + + os_set_thread_name(_nd_thread_os_name); +} + +// -------------------------------------------------------------------------------------------------------------------- + +static __thread bool libuv_name_set = false; +void uv_thread_set_name_np(const char* name) { + if(libuv_name_set) return; + + strncpyz(_nd_thread_os_name, name, sizeof(_nd_thread_os_name) - 1); + os_set_thread_name(_nd_thread_os_name); + libuv_name_set = true; +} + +// -------------------------------------------------------------------------------------------------------------------- + +static size_t webrtc_id = 0; +static __thread bool webrtc_name_set = false; +void webrtc_set_thread_name(void) { + if(_nd_thread_info || webrtc_name_set) return; + + webrtc_name_set = true; + + char tmp[ND_THREAD_TAG_MAX + 1] = ""; + os_get_thread_name(tmp, sizeof(tmp)); + + if(!tmp[0] || strcmp(tmp, "netdata") == 0) { + char name[ND_THREAD_TAG_MAX + 1]; + snprintfz(name, ND_THREAD_TAG_MAX, "WEBRTC[%zu]", __atomic_fetch_add(&webrtc_id, 1, __ATOMIC_RELAXED)); + os_set_thread_name(name); + } + + nd_thread_get_name(true); +} + +// -------------------------------------------------------------------------------------------------------------------- +// locks tracking + +#ifdef NETDATA_INTERNAL_CHECKS +void nd_thread_rwlock_read_locked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_read_locks++; } +void nd_thread_rwlock_read_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_read_locks--; } +void nd_thread_rwlock_write_locked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_write_locks++; } +void nd_thread_rwlock_write_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_write_locks--; } +void nd_thread_mutex_locked(void) { if(_nd_thread_info) _nd_thread_info->mutex_locks++; } +void nd_thread_mutex_unlocked(void) { if(_nd_thread_info) _nd_thread_info->mutex_locks--; } +void nd_thread_spinlock_locked(void) { if(_nd_thread_info) _nd_thread_info->spinlock_locks++; } +void nd_thread_spinlock_unlocked(void) { if(_nd_thread_info) _nd_thread_info->spinlock_locks--; } +void nd_thread_rwspinlock_read_locked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_read_locks++; } +void nd_thread_rwspinlock_read_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_read_locks--; } +void nd_thread_rwspinlock_write_locked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_write_locks++; } +void nd_thread_rwspinlock_write_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_write_locks--; } +#endif + +// -------------------------------------------------------------------------------------------------------------------- +// early initialization + +size_t netdata_threads_init(void) { + int i; + + if(!threads_globals.attr) { + threads_globals.attr = callocz(1, sizeof(pthread_attr_t)); + i = pthread_attr_init(threads_globals.attr); + if (i != 0) + fatal("pthread_attr_init() failed with code %d.", i); + } + + // get the required stack size of the threads of netdata + size_t stacksize = 0; + i = pthread_attr_getstacksize(threads_globals.attr, &stacksize); + if(i != 0) + fatal("pthread_attr_getstacksize() failed with code %d.", i); + + return stacksize; +} + +// ---------------------------------------------------------------------------- +// late initialization + +void netdata_threads_init_after_fork(size_t stacksize) { + int i; + + // set pthread stack size + if(threads_globals.attr && stacksize > (size_t)PTHREAD_STACK_MIN) { + i = pthread_attr_setstacksize(threads_globals.attr, stacksize); + if(i != 0) + nd_log(NDLS_DAEMON, NDLP_WARNING, "pthread_attr_setstacksize() to %zu bytes, failed with code %d.", stacksize, i); + else + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Set threads stack size to %zu bytes", stacksize); + } + else + nd_log(NDLS_DAEMON, NDLP_WARNING, "Invalid pthread stacksize %zu", stacksize); +} + +// ---------------------------------------------------------------------------- +// threads init for external plugins + +void netdata_threads_init_for_external_plugins(size_t stacksize) { + size_t default_stacksize = netdata_threads_init(); + if(default_stacksize < 1 * 1024 * 1024) + default_stacksize = 1 * 1024 * 1024; + + netdata_threads_init_after_fork(stacksize ? stacksize : default_stacksize); +} + +// ---------------------------------------------------------------------------- + +void rrdset_thread_rda_free(void); +void sender_thread_buffer_free(void); +void query_target_free(void); +void service_exits(void); +void rrd_collector_finished(void); + +static void nd_thread_join_exited_detached_threads(void) { + while(1) { + spinlock_lock(&threads_globals.exited.spinlock); + + ND_THREAD *nti = threads_globals.exited.list; + while (nti && nd_thread_status_check(nti, NETDATA_THREAD_OPTION_JOINABLE) == 0) + nti = nti->next; + + if(nti) + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(threads_globals.exited.list, nti, prev, next); + + spinlock_unlock(&threads_globals.exited.spinlock); + + if(nti) { + nd_log(NDLS_DAEMON, NDLP_INFO, "Joining detached thread '%s', tid %d", nti->tag, nti->tid); + nd_thread_join(nti); + } + else + break; + } +} + +static void nd_thread_exit(void *pptr) { + ND_THREAD *nti = CLEANUP_FUNCTION_GET_PTR(pptr); + + if(nti != _nd_thread_info || !nti || !_nd_thread_info) { + nd_log(NDLS_DAEMON, NDLP_ERR, + "THREADS: internal error - thread local variable does not match the one passed to this function. " + "Expected thread '%s', passed thread '%s'", + _nd_thread_info ? _nd_thread_info->tag : "(null)", nti ? nti->tag : "(null)"); + + if(!nti) nti = _nd_thread_info; + } + + if(!nti) return; + + internal_fatal(nti->rwlocks_read_locks != 0, + "THREAD '%s' WITH PID %d HAS %d RWLOCKS READ ACQUIRED WHILE EXITING !!!", + (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwlocks_read_locks); + + internal_fatal(nti->rwlocks_write_locks != 0, + "THREAD '%s' WITH PID %d HAS %d RWLOCKS WRITE ACQUIRED WHILE EXITING !!!", + (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwlocks_write_locks); + + internal_fatal(nti->mutex_locks != 0, + "THREAD '%s' WITH PID %d HAS %d MUTEXES ACQUIRED WHILE EXITING !!!", + (nti) ? nti->tag : "(unset)", gettid_cached(), nti->mutex_locks); + + internal_fatal(nti->spinlock_locks != 0, + "THREAD '%s' WITH PID %d HAS %d SPINLOCKS ACQUIRED WHILE EXITING !!!", + (nti) ? nti->tag : "(unset)", gettid_cached(), nti->spinlock_locks); + + internal_fatal(nti->rwspinlock_read_locks != 0, + "THREAD '%s' WITH PID %d HAS %d RWSPINLOCKS READ ACQUIRED WHILE EXITING !!!", + (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwspinlock_read_locks); + + internal_fatal(nti->rwspinlock_write_locks != 0, + "THREAD '%s' WITH PID %d HAS %d RWSPINLOCKS WRITE ACQUIRED WHILE EXITING !!!", + (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwspinlock_write_locks); + + if(nd_thread_status_check(nti, NETDATA_THREAD_OPTION_DONT_LOG_CLEANUP) != NETDATA_THREAD_OPTION_DONT_LOG_CLEANUP) + nd_log(NDLS_DAEMON, NDLP_DEBUG, "thread with task id %d finished", nti->tid); + + rrd_collector_finished(); + sender_thread_buffer_free(); + rrdset_thread_rda_free(); + query_target_free(); + thread_cache_destroy(); + service_exits(); + worker_unregister(); + + nd_thread_status_set(nti, NETDATA_THREAD_STATUS_FINISHED); + + spinlock_lock(&threads_globals.running.spinlock); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(threads_globals.running.list, nti, prev, next); + spinlock_unlock(&threads_globals.running.spinlock); + + if (nd_thread_status_check(nti, NETDATA_THREAD_OPTION_JOINABLE) != NETDATA_THREAD_OPTION_JOINABLE) { + spinlock_lock(&threads_globals.exited.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(threads_globals.exited.list, nti, prev, next); + spinlock_unlock(&threads_globals.exited.spinlock); + } +} + +static void *nd_thread_starting_point(void *ptr) { + ND_THREAD *nti = _nd_thread_info = (ND_THREAD *)ptr; + nd_thread_status_set(nti, NETDATA_THREAD_STATUS_STARTED); + + nti->tid = gettid_cached(); + nd_thread_tag_set(nti->tag); + + if(nd_thread_status_check(nti, NETDATA_THREAD_OPTION_DONT_LOG_STARTUP) != NETDATA_THREAD_OPTION_DONT_LOG_STARTUP) + nd_log(NDLS_DAEMON, NDLP_DEBUG, "thread created with task id %d", gettid_cached()); + + if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0) + nd_log(NDLS_DAEMON, NDLP_WARNING, "cannot set pthread cancel type to DEFERRED."); + + if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) + nd_log(NDLS_DAEMON, NDLP_WARNING, "cannot set pthread cancel state to ENABLE."); + + CLEANUP_FUNCTION_REGISTER(nd_thread_exit) cleanup_ptr = nti; + + // run the thread code + nti->ret = nti->start_routine(nti->arg); + + return nti; +} + +ND_THREAD *nd_thread_self(void) { + return _nd_thread_info; +} + +bool nd_thread_is_me(ND_THREAD *nti) { + return nti && nti->thread == pthread_self(); +} + +ND_THREAD *nd_thread_create(const char *tag, NETDATA_THREAD_OPTIONS options, void *(*start_routine)(void *), void *arg) { + nd_thread_join_exited_detached_threads(); + + ND_THREAD *nti = callocz(1, sizeof(*nti)); + spinlock_init(&nti->canceller.spinlock); + nti->arg = arg; + nti->start_routine = start_routine; + nti->options = options & NETDATA_THREAD_OPTIONS_ALL; + strncpyz(nti->tag, tag, ND_THREAD_TAG_MAX); + + spinlock_lock(&threads_globals.running.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(threads_globals.running.list, nti, prev, next); + spinlock_unlock(&threads_globals.running.spinlock); + + int ret = pthread_create(&nti->thread, threads_globals.attr, nd_thread_starting_point, nti); + if(ret != 0) { + nd_log(NDLS_DAEMON, NDLP_ERR, + "failed to create new thread for %s. pthread_create() failed with code %d", + tag, ret); + + spinlock_lock(&threads_globals.running.spinlock); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(threads_globals.running.list, nti, prev, next); + spinlock_unlock(&threads_globals.running.spinlock); + freez(nti); + return NULL; + } + + return nti; +} + +// -------------------------------------------------------------------------------------------------------------------- + +void nd_thread_register_canceller(nd_thread_canceller cb, void *data) { + ND_THREAD *nti = _nd_thread_info; + if(!nti) return; + + spinlock_lock(&nti->canceller.spinlock); + nti->canceller.cb = cb; + nti->canceller.data = data; + spinlock_unlock(&nti->canceller.spinlock); +} + +void nd_thread_signal_cancel(ND_THREAD *nti) { + if(!nti) return; + + __atomic_store_n(&nti->cancel_atomic, true, __ATOMIC_RELAXED); + + spinlock_lock(&nti->canceller.spinlock); + if(nti->canceller.cb) + nti->canceller.cb(nti->canceller.data); + spinlock_unlock(&nti->canceller.spinlock); +} + +bool nd_thread_signaled_to_cancel(void) { + if(!_nd_thread_info) return false; + return __atomic_load_n(&_nd_thread_info->cancel_atomic, __ATOMIC_RELAXED); +} + +// ---------------------------------------------------------------------------- +// nd_thread_join + +void nd_thread_join(ND_THREAD *nti) { + if(!nti) return; + + int ret = pthread_join(nti->thread, NULL); + if(ret != 0) + nd_log(NDLS_DAEMON, NDLP_WARNING, "cannot join thread. pthread_join() failed with code %d.", ret); + else { + nd_thread_status_set(nti, NETDATA_THREAD_STATUS_JOINED); + + spinlock_lock(&threads_globals.exited.spinlock); + if(nti->prev) + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(threads_globals.exited.list, nti, prev, next); + spinlock_unlock(&threads_globals.exited.spinlock); + + freez(nti); + } +} |