diff options
Diffstat (limited to 'collectors/statsd.plugin/statsd.c')
-rw-r--r-- | collectors/statsd.plugin/statsd.c | 93 |
1 files changed, 52 insertions, 41 deletions
diff --git a/collectors/statsd.plugin/statsd.c b/collectors/statsd.plugin/statsd.c index 67d7ed2e2..d15129b9c 100644 --- a/collectors/statsd.plugin/statsd.c +++ b/collectors/statsd.plugin/statsd.c @@ -234,7 +234,8 @@ typedef struct statsd_app { // global statsd data struct collection_thread_status { - int status; + SPINLOCK spinlock; + bool running; size_t max_sockets; netdata_thread_t thread; @@ -433,7 +434,7 @@ static inline NETDATA_DOUBLE statsd_parse_float(const char *v, NETDATA_DOUBLE de char *e = NULL; value = str2ndd(v, &e); if(unlikely(e && *e)) - error("STATSD: excess data '%s' after value '%s'", e, v); + collector_error("STATSD: excess data '%s' after value '%s'", e, v); } else value = def; @@ -455,7 +456,7 @@ static inline long long statsd_parse_int(const char *v, long long def) { char *e = NULL; value = str2ll(v, &e); if(unlikely(e && *e)) - error("STATSD: excess data '%s' after value '%s'", e, v); + collector_error("STATSD: excess data '%s' after value '%s'", e, v); } else value = def; @@ -483,7 +484,7 @@ static inline void statsd_process_gauge(STATSD_METRIC *m, const char *value, con if(!is_metric_useful_for_collection(m)) return; if(unlikely(!value || !*value)) { - error("STATSD: metric '%s' of type gauge, with empty value is ignored.", m->name); + collector_error("STATSD: metric '%s' of type gauge, with empty value is ignored.", m->name); return; } @@ -531,7 +532,7 @@ static inline void statsd_process_histogram_or_timer(STATSD_METRIC *m, const cha if(!is_metric_useful_for_collection(m)) return; if(unlikely(!value || !*value)) { - error("STATSD: metric of type %s, with empty value is ignored.", type); + collector_error("STATSD: metric of type %s, with empty value is ignored.", type); return; } @@ -594,7 +595,7 @@ static inline void statsd_process_set(STATSD_METRIC *m, const char *value) { } if (unlikely(!m->set.dict)) { - m->set.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS); + m->set.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0); dictionary_register_insert_callback(m->set.dict, dictionary_metric_set_value_insert_callback, m); m->set.unique = 0; } @@ -634,7 +635,7 @@ static inline void statsd_process_dictionary(STATSD_METRIC *m, const char *value statsd_reset_metric(m); if (unlikely(!m->dictionary.dict)) { - m->dictionary.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS); + m->dictionary.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0); dictionary_register_insert_callback(m->dictionary.dict, dictionary_metric_dict_value_insert_callback, m); m->dictionary.unique = 0; } @@ -873,21 +874,19 @@ struct statsd_tcp { char buffer[]; }; -#ifdef HAVE_RECVMMSG struct statsd_udp { - int *running; + struct collection_thread_status *status; STATSD_SOCKET_DATA_TYPE type; + +#ifdef HAVE_RECVMMSG size_t size; struct iovec *iovecs; struct mmsghdr *msgs; -}; #else -struct statsd_udp { int *running; - STATSD_SOCKET_DATA_TYPE type; char buffer[STATSD_UDP_BUFFER_SIZE]; -}; #endif +}; // new TCP client connected static void *statsd_add_callback(POLLINFO *pi, short int *events, void *data) { @@ -1097,9 +1096,11 @@ static int statsd_snd_callback(POLLINFO *pi, short int *events) { void statsd_collector_thread_cleanup(void *data) { struct statsd_udp *d = data; - *d->running = 0; + netdata_spinlock_lock(&d->status->spinlock); + d->status->running = false; + netdata_spinlock_unlock(&d->status->spinlock); - info("cleaning up..."); + collector_info("cleaning up..."); #ifdef HAVE_RECVMMSG size_t i; @@ -1114,9 +1115,15 @@ void statsd_collector_thread_cleanup(void *data) { worker_unregister(); } +static bool statsd_should_stop(void) { + return !service_running(SERVICE_COLLECTORS); +} + void *statsd_collector_thread(void *ptr) { struct collection_thread_status *status = ptr; - status->status = 1; + netdata_spinlock_lock(&status->spinlock); + status->running = true; + netdata_spinlock_unlock(&status->spinlock); worker_register("STATSD"); worker_register_job_name(WORKER_JOB_TYPE_TCP_CONNECTED, "tcp connect"); @@ -1124,10 +1131,10 @@ void *statsd_collector_thread(void *ptr) { worker_register_job_name(WORKER_JOB_TYPE_RCV_DATA, "receive"); worker_register_job_name(WORKER_JOB_TYPE_SND_DATA, "send"); - info("STATSD collector thread started with taskid %d", gettid()); + collector_info("STATSD collector thread started with taskid %d", gettid()); struct statsd_udp *d = callocz(sizeof(struct statsd_udp), 1); - d->running = &status->status; + d->status = status; netdata_thread_cleanup_push(statsd_collector_thread_cleanup, d); @@ -1152,6 +1159,7 @@ void *statsd_collector_thread(void *ptr) { , statsd_rcv_callback , statsd_snd_callback , NULL + , statsd_should_stop , NULL // No access control pattern , 0 // No dns lookups for access control pattern , (void *)d @@ -1329,7 +1337,7 @@ static int statsd_readfile(const char *filename, STATSD_APP *app, STATSD_APP_CHA else if(app) { if(!strcmp(s, "dictionary")) { if(!app->dict) - app->dict = dictionary_create(DICT_OPTION_SINGLE_THREADED); + app->dict = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED, &dictionary_stats_category_collectors, 0); dict = app->dict; } @@ -1934,7 +1942,7 @@ static inline void statsd_flush_dictionary(STATSD_METRIC *m) { if(m->dictionary.unique >= statsd.dictionary_max_unique) { if(!(m->options & STATSD_METRIC_OPTION_COLLECTION_FULL_LOGGED)) { m->options |= STATSD_METRIC_OPTION_COLLECTION_FULL_LOGGED; - info( + collector_info( "STATSD dictionary '%s' reach max of %zu items - try increasing 'dictionaries max unique dimensions' in netdata.conf", m->name, m->dictionary.unique); @@ -2307,7 +2315,7 @@ static inline void statsd_flush_index_metrics(STATSD_INDEX *index, void (*flush_ if(unlikely(!(m->options & STATSD_METRIC_OPTION_PRIVATE_CHART_CHECKED))) { if(unlikely(statsd.private_charts >= statsd.max_private_charts_hard)) { debug(D_STATSD, "STATSD: metric '%s' will not be charted, because the hard limit of the maximum number of charts has been reached.", m->name); - info("STATSD: metric '%s' will not be charted, because the hard limit of the maximum number of charts (%zu) has been reached. Increase the number of charts by editing netdata.conf, [statsd] section.", m->name, statsd.max_private_charts_hard); + collector_info("STATSD: metric '%s' will not be charted, because the hard limit of the maximum number of charts (%zu) has been reached. Increase the number of charts by editing netdata.conf, [statsd] section.", m->name, statsd.max_private_charts_hard); m->options &= ~STATSD_METRIC_OPTION_PRIVATE_CHART_ENABLED; } else { @@ -2353,22 +2361,24 @@ static int statsd_listen_sockets_setup(void) { static void statsd_main_cleanup(void *data) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - info("cleaning up..."); + collector_info("cleaning up..."); if (statsd.collection_threads_status) { int i; for (i = 0; i < statsd.threads; i++) { - if(statsd.collection_threads_status[i].status) { - info("STATSD: stopping data collection thread %d...", i + 1); + netdata_spinlock_lock(&statsd.collection_threads_status[i].spinlock); + if(statsd.collection_threads_status[i].running) { + collector_info("STATSD: stopping data collection thread %d...", i + 1); netdata_thread_cancel(statsd.collection_threads_status[i].thread); } else { - info("STATSD: data collection thread %d found stopped.", i + 1); + collector_info("STATSD: data collection thread %d found stopped.", i + 1); } + netdata_spinlock_unlock(&statsd.collection_threads_status[i].spinlock); } } - info("STATSD: closing sockets..."); + collector_info("STATSD: closing sockets..."); listen_sockets_close(&statsd.sockets); // destroy the dictionaries @@ -2380,7 +2390,7 @@ static void statsd_main_cleanup(void *data) { dictionary_destroy(statsd.sets.dict); dictionary_destroy(statsd.timers.dict); - info("STATSD: cleanup completed."); + collector_info("STATSD: cleanup completed."); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; worker_unregister(); @@ -2412,13 +2422,13 @@ void *statsd_main(void *ptr) { netdata_thread_cleanup_push(statsd_main_cleanup, ptr); - statsd.gauges.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS); - statsd.meters.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS); - statsd.counters.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS); - statsd.histograms.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS); - statsd.dictionaries.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS); - statsd.sets.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS); - statsd.timers.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS); + statsd.gauges.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0); + statsd.meters.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0); + statsd.counters.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0); + statsd.histograms.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0); + statsd.dictionaries.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0); + statsd.sets.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0); + statsd.timers.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0); dictionary_register_insert_callback(statsd.gauges.dict, dictionary_metric_insert_callback, &statsd.gauges); dictionary_register_insert_callback(statsd.meters.dict, dictionary_metric_insert_callback, &statsd.meters); @@ -2444,7 +2454,7 @@ void *statsd_main(void *ptr) { statsd.update_every = default_rrd_update_every; statsd.update_every = (int)config_get_number(CONFIG_SECTION_STATSD, "update every (flushInterval)", statsd.update_every); if(statsd.update_every < default_rrd_update_every) { - error("STATSD: minimum flush interval %d given, but the minimum is the update every of netdata. Using %d", statsd.update_every, default_rrd_update_every); + collector_error("STATSD: minimum flush interval %d given, but the minimum is the update every of netdata. Using %d", statsd.update_every, default_rrd_update_every); statsd.update_every = default_rrd_update_every; } @@ -2461,7 +2471,7 @@ void *statsd_main(void *ptr) { statsd.histogram_percentile = (double)config_get_float(CONFIG_SECTION_STATSD, "histograms and timers percentile (percentThreshold)", statsd.histogram_percentile); if(isless(statsd.histogram_percentile, 0) || isgreater(statsd.histogram_percentile, 100)) { - error("STATSD: invalid histograms and timers percentile %0.5f given", statsd.histogram_percentile); + collector_error("STATSD: invalid histograms and timers percentile %0.5f given", statsd.histogram_percentile); statsd.histogram_percentile = 95.0; } { @@ -2508,7 +2518,7 @@ void *statsd_main(void *ptr) { #ifdef STATSD_MULTITHREADED statsd.threads = (int)config_get_number(CONFIG_SECTION_STATSD, "threads", processors); if(statsd.threads < 1) { - error("STATSD: Invalid number of threads %d, using %d", statsd.threads, processors); + collector_error("STATSD: Invalid number of threads %d, using %d", statsd.threads, processors); statsd.threads = processors; config_set_number(CONFIG_SECTION_STATSD, "collector threads", statsd.threads); } @@ -2526,7 +2536,7 @@ void *statsd_main(void *ptr) { statsd_listen_sockets_setup(); if(!statsd.sockets.opened) { - error("STATSD: No statsd sockets to listen to. statsd will be disabled."); + collector_error("STATSD: No statsd sockets to listen to. statsd will be disabled."); goto cleanup; } @@ -2536,7 +2546,8 @@ void *statsd_main(void *ptr) { for(i = 0; i < statsd.threads ;i++) { statsd.collection_threads_status[i].max_sockets = max_sockets / statsd.threads; char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STATSD_COLLECTOR[%d]", i + 1); + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STATSD_IN[%d]", i + 1); + netdata_spinlock_init(&statsd.collection_threads_status[i].spinlock); netdata_thread_create(&statsd.collection_threads_status[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT, statsd_collector_thread, &statsd.collection_threads_status[i]); } @@ -2753,7 +2764,7 @@ void *statsd_main(void *ptr) { usec_t step = statsd.update_every * USEC_PER_SEC; heartbeat_t hb; heartbeat_init(&hb); - while(!netdata_exit) { + while(service_running(SERVICE_COLLECTORS)) { worker_is_idle(); heartbeat_next(&hb, step); @@ -2781,7 +2792,7 @@ void *statsd_main(void *ptr) { worker_is_busy(WORKER_STATSD_FLUSH_STATS); statsd_update_all_app_charts(); - if(unlikely(netdata_exit)) + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; if(global_statistics_enabled) { |