// SPDX-License-Identifier: GPL-3.0-or-later #include "../libnetdata.h" #define nd_thread_status_get(nti) __atomic_load_n(&((nti)->options), __ATOMIC_ACQUIRE) #define nd_thread_status_check(nti, flag) (__atomic_load_n(&((nti)->options), __ATOMIC_ACQUIRE) & (flag)) #define nd_thread_status_set(nti, flag) __atomic_or_fetch(&((nti)->options), flag, __ATOMIC_RELEASE) #define nd_thread_status_clear(nti, flag) __atomic_and_fetch(&((nti)->options), ~(flag), __ATOMIC_RELEASE) typedef void (*nd_thread_canceller)(void *data); struct nd_thread { void *arg; pid_t tid; char tag[ND_THREAD_TAG_MAX + 1]; void *ret; // the return value of start routine void *(*start_routine) (void *); NETDATA_THREAD_OPTIONS options; pthread_t thread; bool cancel_atomic; #ifdef NETDATA_INTERNAL_CHECKS // keep track of the locks currently held // used to detect locks that are left locked during exit int rwlocks_read_locks; int rwlocks_write_locks; int mutex_locks; int spinlock_locks; int rwspinlock_read_locks; int rwspinlock_write_locks; #endif struct { SPINLOCK spinlock; nd_thread_canceller cb; void *data; } canceller; struct nd_thread *prev, *next; }; static struct { struct { SPINLOCK spinlock; ND_THREAD *list; } exited; struct { SPINLOCK spinlock; ND_THREAD *list; } running; pthread_attr_t *attr; } threads_globals = { .exited = { .spinlock = NETDATA_SPINLOCK_INITIALIZER, .list = NULL, }, .running = { .spinlock = NETDATA_SPINLOCK_INITIALIZER, .list = NULL, }, .attr = NULL, }; static __thread ND_THREAD *_nd_thread_info = NULL; static __thread char _nd_thread_os_name[ND_THREAD_TAG_MAX + 1] = ""; // -------------------------------------------------------------------------------------------------------------------- // O/S abstraction // get the thread name from the operating system static inline void os_get_thread_name(char *out, size_t size) { #if defined(__FreeBSD__) pthread_get_name_np(pthread_self(), out, size); if(strcmp(_nd_thread_os_name, "netdata") == 0) strncpyz(out, "MAIN", size - 1); #elif defined(HAVE_PTHREAD_GETNAME_NP) pthread_getname_np(pthread_self(), out, size - 1); if(strcmp(out, "netdata") == 0) strncpyz(out, "MAIN", size - 1); #else strncpyz(out, "MAIN", size - 1); #endif } // set the thread name to the operating system static inline void os_set_thread_name(const char *name) { #if defined(__FreeBSD__) pthread_set_name_np(pthread_self(), name); #elif defined(__APPLE__) pthread_setname_np(name); #else pthread_setname_np(pthread_self(), name); #endif } // -------------------------------------------------------------------------------------------------------------------- // internal API for managing names inline int nd_thread_has_tag(void) { return (_nd_thread_info && _nd_thread_info->tag[0]); } // For threads created by netdata, return the tag of the thread. // For threads created by others (libuv, webrtc, etc), return the tag of the operating system. // This caches the response, so that it won't query the operating system multiple times. static inline const char *nd_thread_get_name(bool recheck) { if(nd_thread_has_tag()) return _nd_thread_info->tag; if(!recheck && _nd_thread_os_name[0]) return _nd_thread_os_name; os_get_thread_name(_nd_thread_os_name, sizeof(_nd_thread_os_name)); return _nd_thread_os_name; } const char *nd_thread_tag(void) { return nd_thread_get_name(false); } void nd_thread_tag_set(const char *tag) { if(!tag || !*tag) return; if(_nd_thread_info) strncpyz(_nd_thread_info->tag, tag, sizeof(_nd_thread_info->tag) - 1); strncpyz(_nd_thread_os_name, tag, sizeof(_nd_thread_os_name) - 1); os_set_thread_name(_nd_thread_os_name); } // -------------------------------------------------------------------------------------------------------------------- static __thread bool libuv_name_set = false; void uv_thread_set_name_np(const char* name) { if(libuv_name_set) return; strncpyz(_nd_thread_os_name, name, sizeof(_nd_thread_os_name) - 1); os_set_thread_name(_nd_thread_os_name); libuv_name_set = true; } // -------------------------------------------------------------------------------------------------------------------- static size_t webrtc_id = 0; static __thread bool webrtc_name_set = false; void webrtc_set_thread_name(void) { if(_nd_thread_info || webrtc_name_set) return; webrtc_name_set = true; char tmp[ND_THREAD_TAG_MAX + 1] = ""; os_get_thread_name(tmp, sizeof(tmp)); if(!tmp[0] || strcmp(tmp, "netdata") == 0) { char name[ND_THREAD_TAG_MAX + 1]; snprintfz(name, ND_THREAD_TAG_MAX, "WEBRTC[%zu]", __atomic_fetch_add(&webrtc_id, 1, __ATOMIC_RELAXED)); os_set_thread_name(name); } nd_thread_get_name(true); } // -------------------------------------------------------------------------------------------------------------------- // locks tracking #ifdef NETDATA_INTERNAL_CHECKS void nd_thread_rwlock_read_locked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_read_locks++; } void nd_thread_rwlock_read_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_read_locks--; } void nd_thread_rwlock_write_locked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_write_locks++; } void nd_thread_rwlock_write_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_write_locks--; } void nd_thread_mutex_locked(void) { if(_nd_thread_info) _nd_thread_info->mutex_locks++; } void nd_thread_mutex_unlocked(void) { if(_nd_thread_info) _nd_thread_info->mutex_locks--; } void nd_thread_spinlock_locked(void) { if(_nd_thread_info) _nd_thread_info->spinlock_locks++; } void nd_thread_spinlock_unlocked(void) { if(_nd_thread_info) _nd_thread_info->spinlock_locks--; } void nd_thread_rwspinlock_read_locked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_read_locks++; } void nd_thread_rwspinlock_read_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_read_locks--; } void nd_thread_rwspinlock_write_locked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_write_locks++; } void nd_thread_rwspinlock_write_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_write_locks--; } #endif // -------------------------------------------------------------------------------------------------------------------- // early initialization size_t netdata_threads_init(void) { int i; if(!threads_globals.attr) { threads_globals.attr = callocz(1, sizeof(pthread_attr_t)); i = pthread_attr_init(threads_globals.attr); if (i != 0) fatal("pthread_attr_init() failed with code %d.", i); } // get the required stack size of the threads of netdata size_t stacksize = 0; i = pthread_attr_getstacksize(threads_globals.attr, &stacksize); if(i != 0) fatal("pthread_attr_getstacksize() failed with code %d.", i); return stacksize; } // ---------------------------------------------------------------------------- // late initialization void netdata_threads_init_after_fork(size_t stacksize) { int i; // set pthread stack size if(threads_globals.attr && stacksize > (size_t)PTHREAD_STACK_MIN) { i = pthread_attr_setstacksize(threads_globals.attr, stacksize); if(i != 0) nd_log(NDLS_DAEMON, NDLP_WARNING, "pthread_attr_setstacksize() to %zu bytes, failed with code %d.", stacksize, i); else nd_log(NDLS_DAEMON, NDLP_DEBUG, "Set threads stack size to %zu bytes", stacksize); } else nd_log(NDLS_DAEMON, NDLP_WARNING, "Invalid pthread stacksize %zu", stacksize); } // ---------------------------------------------------------------------------- // threads init for external plugins void netdata_threads_init_for_external_plugins(size_t stacksize) { size_t default_stacksize = netdata_threads_init(); if(default_stacksize < 1 * 1024 * 1024) default_stacksize = 1 * 1024 * 1024; netdata_threads_init_after_fork(stacksize ? stacksize : default_stacksize); } // ---------------------------------------------------------------------------- void rrdset_thread_rda_free(void); void sender_thread_buffer_free(void); void query_target_free(void); void service_exits(void); void rrd_collector_finished(void); static void nd_thread_join_exited_detached_threads(void) { while(1) { spinlock_lock(&threads_globals.exited.spinlock); ND_THREAD *nti = threads_globals.exited.list; while (nti && nd_thread_status_check(nti, NETDATA_THREAD_OPTION_JOINABLE) == 0) nti = nti->next; if(nti) DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(threads_globals.exited.list, nti, prev, next); spinlock_unlock(&threads_globals.exited.spinlock); if(nti) { nd_log(NDLS_DAEMON, NDLP_INFO, "Joining detached thread '%s', tid %d", nti->tag, nti->tid); nd_thread_join(nti); } else break; } } static void nd_thread_exit(void *pptr) { ND_THREAD *nti = CLEANUP_FUNCTION_GET_PTR(pptr); if(nti != _nd_thread_info || !nti || !_nd_thread_info) { nd_log(NDLS_DAEMON, NDLP_ERR, "THREADS: internal error - thread local variable does not match the one passed to this function. " "Expected thread '%s', passed thread '%s'", _nd_thread_info ? _nd_thread_info->tag : "(null)", nti ? nti->tag : "(null)"); if(!nti) nti = _nd_thread_info; } if(!nti) return; internal_fatal(nti->rwlocks_read_locks != 0, "THREAD '%s' WITH PID %d HAS %d RWLOCKS READ ACQUIRED WHILE EXITING !!!", (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwlocks_read_locks); internal_fatal(nti->rwlocks_write_locks != 0, "THREAD '%s' WITH PID %d HAS %d RWLOCKS WRITE ACQUIRED WHILE EXITING !!!", (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwlocks_write_locks); internal_fatal(nti->mutex_locks != 0, "THREAD '%s' WITH PID %d HAS %d MUTEXES ACQUIRED WHILE EXITING !!!", (nti) ? nti->tag : "(unset)", gettid_cached(), nti->mutex_locks); internal_fatal(nti->spinlock_locks != 0, "THREAD '%s' WITH PID %d HAS %d SPINLOCKS ACQUIRED WHILE EXITING !!!", (nti) ? nti->tag : "(unset)", gettid_cached(), nti->spinlock_locks); internal_fatal(nti->rwspinlock_read_locks != 0, "THREAD '%s' WITH PID %d HAS %d RWSPINLOCKS READ ACQUIRED WHILE EXITING !!!", (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwspinlock_read_locks); internal_fatal(nti->rwspinlock_write_locks != 0, "THREAD '%s' WITH PID %d HAS %d RWSPINLOCKS WRITE ACQUIRED WHILE EXITING !!!", (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwspinlock_write_locks); if(nd_thread_status_check(nti, NETDATA_THREAD_OPTION_DONT_LOG_CLEANUP) != NETDATA_THREAD_OPTION_DONT_LOG_CLEANUP) nd_log(NDLS_DAEMON, NDLP_DEBUG, "thread with task id %d finished", nti->tid); rrd_collector_finished(); sender_thread_buffer_free(); rrdset_thread_rda_free(); query_target_free(); thread_cache_destroy(); service_exits(); worker_unregister(); nd_thread_status_set(nti, NETDATA_THREAD_STATUS_FINISHED); spinlock_lock(&threads_globals.running.spinlock); DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(threads_globals.running.list, nti, prev, next); spinlock_unlock(&threads_globals.running.spinlock); if (nd_thread_status_check(nti, NETDATA_THREAD_OPTION_JOINABLE) != NETDATA_THREAD_OPTION_JOINABLE) { spinlock_lock(&threads_globals.exited.spinlock); DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(threads_globals.exited.list, nti, prev, next); spinlock_unlock(&threads_globals.exited.spinlock); } } static void *nd_thread_starting_point(void *ptr) { ND_THREAD *nti = _nd_thread_info = (ND_THREAD *)ptr; nd_thread_status_set(nti, NETDATA_THREAD_STATUS_STARTED); nti->tid = gettid_cached(); nd_thread_tag_set(nti->tag); if(nd_thread_status_check(nti, NETDATA_THREAD_OPTION_DONT_LOG_STARTUP) != NETDATA_THREAD_OPTION_DONT_LOG_STARTUP) nd_log(NDLS_DAEMON, NDLP_DEBUG, "thread created with task id %d", gettid_cached()); if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0) nd_log(NDLS_DAEMON, NDLP_WARNING, "cannot set pthread cancel type to DEFERRED."); if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) nd_log(NDLS_DAEMON, NDLP_WARNING, "cannot set pthread cancel state to ENABLE."); CLEANUP_FUNCTION_REGISTER(nd_thread_exit) cleanup_ptr = nti; // run the thread code nti->ret = nti->start_routine(nti->arg); return nti; } ND_THREAD *nd_thread_self(void) { return _nd_thread_info; } bool nd_thread_is_me(ND_THREAD *nti) { return nti && nti->thread == pthread_self(); } ND_THREAD *nd_thread_create(const char *tag, NETDATA_THREAD_OPTIONS options, void *(*start_routine)(void *), void *arg) { nd_thread_join_exited_detached_threads(); ND_THREAD *nti = callocz(1, sizeof(*nti)); spinlock_init(&nti->canceller.spinlock); nti->arg = arg; nti->start_routine = start_routine; nti->options = options & NETDATA_THREAD_OPTIONS_ALL; strncpyz(nti->tag, tag, ND_THREAD_TAG_MAX); spinlock_lock(&threads_globals.running.spinlock); DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(threads_globals.running.list, nti, prev, next); spinlock_unlock(&threads_globals.running.spinlock); int ret = pthread_create(&nti->thread, threads_globals.attr, nd_thread_starting_point, nti); if(ret != 0) { nd_log(NDLS_DAEMON, NDLP_ERR, "failed to create new thread for %s. pthread_create() failed with code %d", tag, ret); spinlock_lock(&threads_globals.running.spinlock); DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(threads_globals.running.list, nti, prev, next); spinlock_unlock(&threads_globals.running.spinlock); freez(nti); return NULL; } return nti; } // -------------------------------------------------------------------------------------------------------------------- void nd_thread_register_canceller(nd_thread_canceller cb, void *data) { ND_THREAD *nti = _nd_thread_info; if(!nti) return; spinlock_lock(&nti->canceller.spinlock); nti->canceller.cb = cb; nti->canceller.data = data; spinlock_unlock(&nti->canceller.spinlock); } void nd_thread_signal_cancel(ND_THREAD *nti) { if(!nti) return; __atomic_store_n(&nti->cancel_atomic, true, __ATOMIC_RELAXED); spinlock_lock(&nti->canceller.spinlock); if(nti->canceller.cb) nti->canceller.cb(nti->canceller.data); spinlock_unlock(&nti->canceller.spinlock); } bool nd_thread_signaled_to_cancel(void) { if(!_nd_thread_info) return false; return __atomic_load_n(&_nd_thread_info->cancel_atomic, __ATOMIC_RELAXED); } // ---------------------------------------------------------------------------- // nd_thread_join int nd_thread_join(ND_THREAD *nti) { if(!nti) return ESRCH; int ret = pthread_join(nti->thread, NULL); if(ret != 0) { nd_log(NDLS_DAEMON, NDLP_WARNING, "cannot join thread. pthread_join() failed with code %d. (tag=%s)", ret, nti->tag); } else { nd_thread_status_set(nti, NETDATA_THREAD_STATUS_JOINED); spinlock_lock(&threads_globals.exited.spinlock); if(nti->prev) DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(threads_globals.exited.list, nti, prev, next); spinlock_unlock(&threads_globals.exited.spinlock); freez(nti); } return ret; }