summaryrefslogtreecommitdiffstats
path: root/src/libnetdata/threads/threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libnetdata/threads/threads.c')
-rw-r--r--src/libnetdata/threads/threads.c437
1 files changed, 437 insertions, 0 deletions
diff --git a/src/libnetdata/threads/threads.c b/src/libnetdata/threads/threads.c
new file mode 100644
index 000000000..0e12d173e
--- /dev/null
+++ b/src/libnetdata/threads/threads.c
@@ -0,0 +1,437 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "../libnetdata.h"
+
+#define nd_thread_status_get(nti) __atomic_load_n(&((nti)->options), __ATOMIC_ACQUIRE)
+#define nd_thread_status_check(nti, flag) (__atomic_load_n(&((nti)->options), __ATOMIC_ACQUIRE) & (flag))
+#define nd_thread_status_set(nti, flag) __atomic_or_fetch(&((nti)->options), flag, __ATOMIC_RELEASE)
+#define nd_thread_status_clear(nti, flag) __atomic_and_fetch(&((nti)->options), ~(flag), __ATOMIC_RELEASE)
+
+typedef void (*nd_thread_canceller)(void *data);
+
+struct nd_thread {
+ void *arg;
+ pid_t tid;
+ char tag[ND_THREAD_TAG_MAX + 1];
+ void *ret; // the return value of start routine
+ void *(*start_routine) (void *);
+ NETDATA_THREAD_OPTIONS options;
+ pthread_t thread;
+ bool cancel_atomic;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ // keep track of the locks currently held
+ // used to detect locks that are left locked during exit
+ int rwlocks_read_locks;
+ int rwlocks_write_locks;
+ int mutex_locks;
+ int spinlock_locks;
+ int rwspinlock_read_locks;
+ int rwspinlock_write_locks;
+#endif
+
+ struct {
+ SPINLOCK spinlock;
+ nd_thread_canceller cb;
+ void *data;
+ } canceller;
+
+ struct nd_thread *prev, *next;
+};
+
+static struct {
+ struct {
+ SPINLOCK spinlock;
+ ND_THREAD *list;
+ } exited;
+
+ struct {
+ SPINLOCK spinlock;
+ ND_THREAD *list;
+ } running;
+
+ pthread_attr_t *attr;
+} threads_globals = {
+ .exited = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .list = NULL,
+ },
+ .running = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .list = NULL,
+ },
+ .attr = NULL,
+};
+
+static __thread ND_THREAD *_nd_thread_info = NULL;
+static __thread char _nd_thread_os_name[ND_THREAD_TAG_MAX + 1] = "";
+
+// --------------------------------------------------------------------------------------------------------------------
+// O/S abstraction
+
+// get the thread name from the operating system
+static inline void os_get_thread_name(char *out, size_t size) {
+#if defined(__FreeBSD__)
+ pthread_get_name_np(pthread_self(), out, size);
+ if(strcmp(_nd_thread_os_name, "netdata") == 0)
+ strncpyz(out, "MAIN", size - 1);
+#elif defined(HAVE_PTHREAD_GETNAME_NP)
+ pthread_getname_np(pthread_self(), out, size - 1);
+ if(strcmp(out, "netdata") == 0)
+ strncpyz(out, "MAIN", size - 1);
+#else
+ strncpyz(out, "MAIN", size - 1);
+#endif
+}
+
+// set the thread name to the operating system
+static inline void os_set_thread_name(const char *name) {
+#if defined(__FreeBSD__)
+ pthread_set_name_np(pthread_self(), name);
+#elif defined(__APPLE__)
+ pthread_setname_np(name);
+#else
+ pthread_setname_np(pthread_self(), name);
+#endif
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+// internal API for managing names
+
+inline int nd_thread_has_tag(void) {
+ return (_nd_thread_info && _nd_thread_info->tag[0]);
+}
+
+// For threads created by netdata, return the tag of the thread.
+// For threads created by others (libuv, webrtc, etc), return the tag of the operating system.
+// This caches the response, so that it won't query the operating system multiple times.
+static inline const char *nd_thread_get_name(bool recheck) {
+ if(nd_thread_has_tag())
+ return _nd_thread_info->tag;
+
+ if(!recheck && _nd_thread_os_name[0])
+ return _nd_thread_os_name;
+
+ os_get_thread_name(_nd_thread_os_name, sizeof(_nd_thread_os_name));
+
+ return _nd_thread_os_name;
+}
+
+const char *nd_thread_tag(void) {
+ return nd_thread_get_name(false);
+}
+
+void nd_thread_tag_set(const char *tag) {
+ if(!tag || !*tag) return;
+
+ if(_nd_thread_info)
+ strncpyz(_nd_thread_info->tag, tag, sizeof(_nd_thread_info->tag) - 1);
+
+ strncpyz(_nd_thread_os_name, tag, sizeof(_nd_thread_os_name) - 1);
+
+ os_set_thread_name(_nd_thread_os_name);
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+static __thread bool libuv_name_set = false;
+void uv_thread_set_name_np(const char* name) {
+ if(libuv_name_set) return;
+
+ strncpyz(_nd_thread_os_name, name, sizeof(_nd_thread_os_name) - 1);
+ os_set_thread_name(_nd_thread_os_name);
+ libuv_name_set = true;
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+static size_t webrtc_id = 0;
+static __thread bool webrtc_name_set = false;
+void webrtc_set_thread_name(void) {
+ if(_nd_thread_info || webrtc_name_set) return;
+
+ webrtc_name_set = true;
+
+ char tmp[ND_THREAD_TAG_MAX + 1] = "";
+ os_get_thread_name(tmp, sizeof(tmp));
+
+ if(!tmp[0] || strcmp(tmp, "netdata") == 0) {
+ char name[ND_THREAD_TAG_MAX + 1];
+ snprintfz(name, ND_THREAD_TAG_MAX, "WEBRTC[%zu]", __atomic_fetch_add(&webrtc_id, 1, __ATOMIC_RELAXED));
+ os_set_thread_name(name);
+ }
+
+ nd_thread_get_name(true);
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+// locks tracking
+
+#ifdef NETDATA_INTERNAL_CHECKS
+void nd_thread_rwlock_read_locked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_read_locks++; }
+void nd_thread_rwlock_read_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_read_locks--; }
+void nd_thread_rwlock_write_locked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_write_locks++; }
+void nd_thread_rwlock_write_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwlocks_write_locks--; }
+void nd_thread_mutex_locked(void) { if(_nd_thread_info) _nd_thread_info->mutex_locks++; }
+void nd_thread_mutex_unlocked(void) { if(_nd_thread_info) _nd_thread_info->mutex_locks--; }
+void nd_thread_spinlock_locked(void) { if(_nd_thread_info) _nd_thread_info->spinlock_locks++; }
+void nd_thread_spinlock_unlocked(void) { if(_nd_thread_info) _nd_thread_info->spinlock_locks--; }
+void nd_thread_rwspinlock_read_locked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_read_locks++; }
+void nd_thread_rwspinlock_read_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_read_locks--; }
+void nd_thread_rwspinlock_write_locked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_write_locks++; }
+void nd_thread_rwspinlock_write_unlocked(void) { if(_nd_thread_info) _nd_thread_info->rwspinlock_write_locks--; }
+#endif
+
+// --------------------------------------------------------------------------------------------------------------------
+// early initialization
+
+size_t netdata_threads_init(void) {
+ int i;
+
+ if(!threads_globals.attr) {
+ threads_globals.attr = callocz(1, sizeof(pthread_attr_t));
+ i = pthread_attr_init(threads_globals.attr);
+ if (i != 0)
+ fatal("pthread_attr_init() failed with code %d.", i);
+ }
+
+ // get the required stack size of the threads of netdata
+ size_t stacksize = 0;
+ i = pthread_attr_getstacksize(threads_globals.attr, &stacksize);
+ if(i != 0)
+ fatal("pthread_attr_getstacksize() failed with code %d.", i);
+
+ return stacksize;
+}
+
+// ----------------------------------------------------------------------------
+// late initialization
+
+void netdata_threads_init_after_fork(size_t stacksize) {
+ int i;
+
+ // set pthread stack size
+ if(threads_globals.attr && stacksize > (size_t)PTHREAD_STACK_MIN) {
+ i = pthread_attr_setstacksize(threads_globals.attr, stacksize);
+ if(i != 0)
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "pthread_attr_setstacksize() to %zu bytes, failed with code %d.", stacksize, i);
+ else
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Set threads stack size to %zu bytes", stacksize);
+ }
+ else
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "Invalid pthread stacksize %zu", stacksize);
+}
+
+// ----------------------------------------------------------------------------
+// threads init for external plugins
+
+void netdata_threads_init_for_external_plugins(size_t stacksize) {
+ size_t default_stacksize = netdata_threads_init();
+ if(default_stacksize < 1 * 1024 * 1024)
+ default_stacksize = 1 * 1024 * 1024;
+
+ netdata_threads_init_after_fork(stacksize ? stacksize : default_stacksize);
+}
+
+// ----------------------------------------------------------------------------
+
+void rrdset_thread_rda_free(void);
+void sender_thread_buffer_free(void);
+void query_target_free(void);
+void service_exits(void);
+void rrd_collector_finished(void);
+
+static void nd_thread_join_exited_detached_threads(void) {
+ while(1) {
+ spinlock_lock(&threads_globals.exited.spinlock);
+
+ ND_THREAD *nti = threads_globals.exited.list;
+ while (nti && nd_thread_status_check(nti, NETDATA_THREAD_OPTION_JOINABLE) == 0)
+ nti = nti->next;
+
+ if(nti)
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(threads_globals.exited.list, nti, prev, next);
+
+ spinlock_unlock(&threads_globals.exited.spinlock);
+
+ if(nti) {
+ nd_log(NDLS_DAEMON, NDLP_INFO, "Joining detached thread '%s', tid %d", nti->tag, nti->tid);
+ nd_thread_join(nti);
+ }
+ else
+ break;
+ }
+}
+
+static void nd_thread_exit(void *pptr) {
+ ND_THREAD *nti = CLEANUP_FUNCTION_GET_PTR(pptr);
+
+ if(nti != _nd_thread_info || !nti || !_nd_thread_info) {
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "THREADS: internal error - thread local variable does not match the one passed to this function. "
+ "Expected thread '%s', passed thread '%s'",
+ _nd_thread_info ? _nd_thread_info->tag : "(null)", nti ? nti->tag : "(null)");
+
+ if(!nti) nti = _nd_thread_info;
+ }
+
+ if(!nti) return;
+
+ internal_fatal(nti->rwlocks_read_locks != 0,
+ "THREAD '%s' WITH PID %d HAS %d RWLOCKS READ ACQUIRED WHILE EXITING !!!",
+ (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwlocks_read_locks);
+
+ internal_fatal(nti->rwlocks_write_locks != 0,
+ "THREAD '%s' WITH PID %d HAS %d RWLOCKS WRITE ACQUIRED WHILE EXITING !!!",
+ (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwlocks_write_locks);
+
+ internal_fatal(nti->mutex_locks != 0,
+ "THREAD '%s' WITH PID %d HAS %d MUTEXES ACQUIRED WHILE EXITING !!!",
+ (nti) ? nti->tag : "(unset)", gettid_cached(), nti->mutex_locks);
+
+ internal_fatal(nti->spinlock_locks != 0,
+ "THREAD '%s' WITH PID %d HAS %d SPINLOCKS ACQUIRED WHILE EXITING !!!",
+ (nti) ? nti->tag : "(unset)", gettid_cached(), nti->spinlock_locks);
+
+ internal_fatal(nti->rwspinlock_read_locks != 0,
+ "THREAD '%s' WITH PID %d HAS %d RWSPINLOCKS READ ACQUIRED WHILE EXITING !!!",
+ (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwspinlock_read_locks);
+
+ internal_fatal(nti->rwspinlock_write_locks != 0,
+ "THREAD '%s' WITH PID %d HAS %d RWSPINLOCKS WRITE ACQUIRED WHILE EXITING !!!",
+ (nti) ? nti->tag : "(unset)", gettid_cached(), nti->rwspinlock_write_locks);
+
+ if(nd_thread_status_check(nti, NETDATA_THREAD_OPTION_DONT_LOG_CLEANUP) != NETDATA_THREAD_OPTION_DONT_LOG_CLEANUP)
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "thread with task id %d finished", nti->tid);
+
+ rrd_collector_finished();
+ sender_thread_buffer_free();
+ rrdset_thread_rda_free();
+ query_target_free();
+ thread_cache_destroy();
+ service_exits();
+ worker_unregister();
+
+ nd_thread_status_set(nti, NETDATA_THREAD_STATUS_FINISHED);
+
+ spinlock_lock(&threads_globals.running.spinlock);
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(threads_globals.running.list, nti, prev, next);
+ spinlock_unlock(&threads_globals.running.spinlock);
+
+ if (nd_thread_status_check(nti, NETDATA_THREAD_OPTION_JOINABLE) != NETDATA_THREAD_OPTION_JOINABLE) {
+ spinlock_lock(&threads_globals.exited.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(threads_globals.exited.list, nti, prev, next);
+ spinlock_unlock(&threads_globals.exited.spinlock);
+ }
+}
+
+static void *nd_thread_starting_point(void *ptr) {
+ ND_THREAD *nti = _nd_thread_info = (ND_THREAD *)ptr;
+ nd_thread_status_set(nti, NETDATA_THREAD_STATUS_STARTED);
+
+ nti->tid = gettid_cached();
+ nd_thread_tag_set(nti->tag);
+
+ if(nd_thread_status_check(nti, NETDATA_THREAD_OPTION_DONT_LOG_STARTUP) != NETDATA_THREAD_OPTION_DONT_LOG_STARTUP)
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "thread created with task id %d", gettid_cached());
+
+ if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "cannot set pthread cancel type to DEFERRED.");
+
+ if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "cannot set pthread cancel state to ENABLE.");
+
+ CLEANUP_FUNCTION_REGISTER(nd_thread_exit) cleanup_ptr = nti;
+
+ // run the thread code
+ nti->ret = nti->start_routine(nti->arg);
+
+ return nti;
+}
+
+ND_THREAD *nd_thread_self(void) {
+ return _nd_thread_info;
+}
+
+bool nd_thread_is_me(ND_THREAD *nti) {
+ return nti && nti->thread == pthread_self();
+}
+
+ND_THREAD *nd_thread_create(const char *tag, NETDATA_THREAD_OPTIONS options, void *(*start_routine)(void *), void *arg) {
+ nd_thread_join_exited_detached_threads();
+
+ ND_THREAD *nti = callocz(1, sizeof(*nti));
+ spinlock_init(&nti->canceller.spinlock);
+ nti->arg = arg;
+ nti->start_routine = start_routine;
+ nti->options = options & NETDATA_THREAD_OPTIONS_ALL;
+ strncpyz(nti->tag, tag, ND_THREAD_TAG_MAX);
+
+ spinlock_lock(&threads_globals.running.spinlock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(threads_globals.running.list, nti, prev, next);
+ spinlock_unlock(&threads_globals.running.spinlock);
+
+ int ret = pthread_create(&nti->thread, threads_globals.attr, nd_thread_starting_point, nti);
+ if(ret != 0) {
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "failed to create new thread for %s. pthread_create() failed with code %d",
+ tag, ret);
+
+ spinlock_lock(&threads_globals.running.spinlock);
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(threads_globals.running.list, nti, prev, next);
+ spinlock_unlock(&threads_globals.running.spinlock);
+ freez(nti);
+ return NULL;
+ }
+
+ return nti;
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+void nd_thread_register_canceller(nd_thread_canceller cb, void *data) {
+ ND_THREAD *nti = _nd_thread_info;
+ if(!nti) return;
+
+ spinlock_lock(&nti->canceller.spinlock);
+ nti->canceller.cb = cb;
+ nti->canceller.data = data;
+ spinlock_unlock(&nti->canceller.spinlock);
+}
+
+void nd_thread_signal_cancel(ND_THREAD *nti) {
+ if(!nti) return;
+
+ __atomic_store_n(&nti->cancel_atomic, true, __ATOMIC_RELAXED);
+
+ spinlock_lock(&nti->canceller.spinlock);
+ if(nti->canceller.cb)
+ nti->canceller.cb(nti->canceller.data);
+ spinlock_unlock(&nti->canceller.spinlock);
+}
+
+bool nd_thread_signaled_to_cancel(void) {
+ if(!_nd_thread_info) return false;
+ return __atomic_load_n(&_nd_thread_info->cancel_atomic, __ATOMIC_RELAXED);
+}
+
+// ----------------------------------------------------------------------------
+// nd_thread_join
+
+void nd_thread_join(ND_THREAD *nti) {
+ if(!nti) return;
+
+ int ret = pthread_join(nti->thread, NULL);
+ if(ret != 0)
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "cannot join thread. pthread_join() failed with code %d.", ret);
+ else {
+ nd_thread_status_set(nti, NETDATA_THREAD_STATUS_JOINED);
+
+ spinlock_lock(&threads_globals.exited.spinlock);
+ if(nti->prev)
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(threads_globals.exited.list, nti, prev, next);
+ spinlock_unlock(&threads_globals.exited.spinlock);
+
+ freez(nti);
+ }
+}