summaryrefslogtreecommitdiffstats
path: root/collectors/statsd.plugin/statsd.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/statsd.plugin/statsd.c')
-rw-r--r--collectors/statsd.plugin/statsd.c93
1 files changed, 52 insertions, 41 deletions
diff --git a/collectors/statsd.plugin/statsd.c b/collectors/statsd.plugin/statsd.c
index 67d7ed2e2..d15129b9c 100644
--- a/collectors/statsd.plugin/statsd.c
+++ b/collectors/statsd.plugin/statsd.c
@@ -234,7 +234,8 @@ typedef struct statsd_app {
// global statsd data
struct collection_thread_status {
- int status;
+ SPINLOCK spinlock;
+ bool running;
size_t max_sockets;
netdata_thread_t thread;
@@ -433,7 +434,7 @@ static inline NETDATA_DOUBLE statsd_parse_float(const char *v, NETDATA_DOUBLE de
char *e = NULL;
value = str2ndd(v, &e);
if(unlikely(e && *e))
- error("STATSD: excess data '%s' after value '%s'", e, v);
+ collector_error("STATSD: excess data '%s' after value '%s'", e, v);
}
else
value = def;
@@ -455,7 +456,7 @@ static inline long long statsd_parse_int(const char *v, long long def) {
char *e = NULL;
value = str2ll(v, &e);
if(unlikely(e && *e))
- error("STATSD: excess data '%s' after value '%s'", e, v);
+ collector_error("STATSD: excess data '%s' after value '%s'", e, v);
}
else
value = def;
@@ -483,7 +484,7 @@ static inline void statsd_process_gauge(STATSD_METRIC *m, const char *value, con
if(!is_metric_useful_for_collection(m)) return;
if(unlikely(!value || !*value)) {
- error("STATSD: metric '%s' of type gauge, with empty value is ignored.", m->name);
+ collector_error("STATSD: metric '%s' of type gauge, with empty value is ignored.", m->name);
return;
}
@@ -531,7 +532,7 @@ static inline void statsd_process_histogram_or_timer(STATSD_METRIC *m, const cha
if(!is_metric_useful_for_collection(m)) return;
if(unlikely(!value || !*value)) {
- error("STATSD: metric of type %s, with empty value is ignored.", type);
+ collector_error("STATSD: metric of type %s, with empty value is ignored.", type);
return;
}
@@ -594,7 +595,7 @@ static inline void statsd_process_set(STATSD_METRIC *m, const char *value) {
}
if (unlikely(!m->set.dict)) {
- m->set.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+ m->set.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0);
dictionary_register_insert_callback(m->set.dict, dictionary_metric_set_value_insert_callback, m);
m->set.unique = 0;
}
@@ -634,7 +635,7 @@ static inline void statsd_process_dictionary(STATSD_METRIC *m, const char *value
statsd_reset_metric(m);
if (unlikely(!m->dictionary.dict)) {
- m->dictionary.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+ m->dictionary.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0);
dictionary_register_insert_callback(m->dictionary.dict, dictionary_metric_dict_value_insert_callback, m);
m->dictionary.unique = 0;
}
@@ -873,21 +874,19 @@ struct statsd_tcp {
char buffer[];
};
-#ifdef HAVE_RECVMMSG
struct statsd_udp {
- int *running;
+ struct collection_thread_status *status;
STATSD_SOCKET_DATA_TYPE type;
+
+#ifdef HAVE_RECVMMSG
size_t size;
struct iovec *iovecs;
struct mmsghdr *msgs;
-};
#else
-struct statsd_udp {
int *running;
- STATSD_SOCKET_DATA_TYPE type;
char buffer[STATSD_UDP_BUFFER_SIZE];
-};
#endif
+};
// new TCP client connected
static void *statsd_add_callback(POLLINFO *pi, short int *events, void *data) {
@@ -1097,9 +1096,11 @@ static int statsd_snd_callback(POLLINFO *pi, short int *events) {
void statsd_collector_thread_cleanup(void *data) {
struct statsd_udp *d = data;
- *d->running = 0;
+ netdata_spinlock_lock(&d->status->spinlock);
+ d->status->running = false;
+ netdata_spinlock_unlock(&d->status->spinlock);
- info("cleaning up...");
+ collector_info("cleaning up...");
#ifdef HAVE_RECVMMSG
size_t i;
@@ -1114,9 +1115,15 @@ void statsd_collector_thread_cleanup(void *data) {
worker_unregister();
}
+static bool statsd_should_stop(void) {
+ return !service_running(SERVICE_COLLECTORS);
+}
+
void *statsd_collector_thread(void *ptr) {
struct collection_thread_status *status = ptr;
- status->status = 1;
+ netdata_spinlock_lock(&status->spinlock);
+ status->running = true;
+ netdata_spinlock_unlock(&status->spinlock);
worker_register("STATSD");
worker_register_job_name(WORKER_JOB_TYPE_TCP_CONNECTED, "tcp connect");
@@ -1124,10 +1131,10 @@ void *statsd_collector_thread(void *ptr) {
worker_register_job_name(WORKER_JOB_TYPE_RCV_DATA, "receive");
worker_register_job_name(WORKER_JOB_TYPE_SND_DATA, "send");
- info("STATSD collector thread started with taskid %d", gettid());
+ collector_info("STATSD collector thread started with taskid %d", gettid());
struct statsd_udp *d = callocz(sizeof(struct statsd_udp), 1);
- d->running = &status->status;
+ d->status = status;
netdata_thread_cleanup_push(statsd_collector_thread_cleanup, d);
@@ -1152,6 +1159,7 @@ void *statsd_collector_thread(void *ptr) {
, statsd_rcv_callback
, statsd_snd_callback
, NULL
+ , statsd_should_stop
, NULL // No access control pattern
, 0 // No dns lookups for access control pattern
, (void *)d
@@ -1329,7 +1337,7 @@ static int statsd_readfile(const char *filename, STATSD_APP *app, STATSD_APP_CHA
else if(app) {
if(!strcmp(s, "dictionary")) {
if(!app->dict)
- app->dict = dictionary_create(DICT_OPTION_SINGLE_THREADED);
+ app->dict = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED, &dictionary_stats_category_collectors, 0);
dict = app->dict;
}
@@ -1934,7 +1942,7 @@ static inline void statsd_flush_dictionary(STATSD_METRIC *m) {
if(m->dictionary.unique >= statsd.dictionary_max_unique) {
if(!(m->options & STATSD_METRIC_OPTION_COLLECTION_FULL_LOGGED)) {
m->options |= STATSD_METRIC_OPTION_COLLECTION_FULL_LOGGED;
- info(
+ collector_info(
"STATSD dictionary '%s' reach max of %zu items - try increasing 'dictionaries max unique dimensions' in netdata.conf",
m->name,
m->dictionary.unique);
@@ -2307,7 +2315,7 @@ static inline void statsd_flush_index_metrics(STATSD_INDEX *index, void (*flush_
if(unlikely(!(m->options & STATSD_METRIC_OPTION_PRIVATE_CHART_CHECKED))) {
if(unlikely(statsd.private_charts >= statsd.max_private_charts_hard)) {
debug(D_STATSD, "STATSD: metric '%s' will not be charted, because the hard limit of the maximum number of charts has been reached.", m->name);
- info("STATSD: metric '%s' will not be charted, because the hard limit of the maximum number of charts (%zu) has been reached. Increase the number of charts by editing netdata.conf, [statsd] section.", m->name, statsd.max_private_charts_hard);
+ collector_info("STATSD: metric '%s' will not be charted, because the hard limit of the maximum number of charts (%zu) has been reached. Increase the number of charts by editing netdata.conf, [statsd] section.", m->name, statsd.max_private_charts_hard);
m->options &= ~STATSD_METRIC_OPTION_PRIVATE_CHART_ENABLED;
}
else {
@@ -2353,22 +2361,24 @@ static int statsd_listen_sockets_setup(void) {
static void statsd_main_cleanup(void *data) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- info("cleaning up...");
+ collector_info("cleaning up...");
if (statsd.collection_threads_status) {
int i;
for (i = 0; i < statsd.threads; i++) {
- if(statsd.collection_threads_status[i].status) {
- info("STATSD: stopping data collection thread %d...", i + 1);
+ netdata_spinlock_lock(&statsd.collection_threads_status[i].spinlock);
+ if(statsd.collection_threads_status[i].running) {
+ collector_info("STATSD: stopping data collection thread %d...", i + 1);
netdata_thread_cancel(statsd.collection_threads_status[i].thread);
}
else {
- info("STATSD: data collection thread %d found stopped.", i + 1);
+ collector_info("STATSD: data collection thread %d found stopped.", i + 1);
}
+ netdata_spinlock_unlock(&statsd.collection_threads_status[i].spinlock);
}
}
- info("STATSD: closing sockets...");
+ collector_info("STATSD: closing sockets...");
listen_sockets_close(&statsd.sockets);
// destroy the dictionaries
@@ -2380,7 +2390,7 @@ static void statsd_main_cleanup(void *data) {
dictionary_destroy(statsd.sets.dict);
dictionary_destroy(statsd.timers.dict);
- info("STATSD: cleanup completed.");
+ collector_info("STATSD: cleanup completed.");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
worker_unregister();
@@ -2412,13 +2422,13 @@ void *statsd_main(void *ptr) {
netdata_thread_cleanup_push(statsd_main_cleanup, ptr);
- statsd.gauges.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
- statsd.meters.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
- statsd.counters.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
- statsd.histograms.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
- statsd.dictionaries.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
- statsd.sets.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
- statsd.timers.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+ statsd.gauges.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0);
+ statsd.meters.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0);
+ statsd.counters.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0);
+ statsd.histograms.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0);
+ statsd.dictionaries.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0);
+ statsd.sets.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0);
+ statsd.timers.dict = dictionary_create_advanced(STATSD_DICTIONARY_OPTIONS, &dictionary_stats_category_collectors, 0);
dictionary_register_insert_callback(statsd.gauges.dict, dictionary_metric_insert_callback, &statsd.gauges);
dictionary_register_insert_callback(statsd.meters.dict, dictionary_metric_insert_callback, &statsd.meters);
@@ -2444,7 +2454,7 @@ void *statsd_main(void *ptr) {
statsd.update_every = default_rrd_update_every;
statsd.update_every = (int)config_get_number(CONFIG_SECTION_STATSD, "update every (flushInterval)", statsd.update_every);
if(statsd.update_every < default_rrd_update_every) {
- error("STATSD: minimum flush interval %d given, but the minimum is the update every of netdata. Using %d", statsd.update_every, default_rrd_update_every);
+ collector_error("STATSD: minimum flush interval %d given, but the minimum is the update every of netdata. Using %d", statsd.update_every, default_rrd_update_every);
statsd.update_every = default_rrd_update_every;
}
@@ -2461,7 +2471,7 @@ void *statsd_main(void *ptr) {
statsd.histogram_percentile = (double)config_get_float(CONFIG_SECTION_STATSD, "histograms and timers percentile (percentThreshold)", statsd.histogram_percentile);
if(isless(statsd.histogram_percentile, 0) || isgreater(statsd.histogram_percentile, 100)) {
- error("STATSD: invalid histograms and timers percentile %0.5f given", statsd.histogram_percentile);
+ collector_error("STATSD: invalid histograms and timers percentile %0.5f given", statsd.histogram_percentile);
statsd.histogram_percentile = 95.0;
}
{
@@ -2508,7 +2518,7 @@ void *statsd_main(void *ptr) {
#ifdef STATSD_MULTITHREADED
statsd.threads = (int)config_get_number(CONFIG_SECTION_STATSD, "threads", processors);
if(statsd.threads < 1) {
- error("STATSD: Invalid number of threads %d, using %d", statsd.threads, processors);
+ collector_error("STATSD: Invalid number of threads %d, using %d", statsd.threads, processors);
statsd.threads = processors;
config_set_number(CONFIG_SECTION_STATSD, "collector threads", statsd.threads);
}
@@ -2526,7 +2536,7 @@ void *statsd_main(void *ptr) {
statsd_listen_sockets_setup();
if(!statsd.sockets.opened) {
- error("STATSD: No statsd sockets to listen to. statsd will be disabled.");
+ collector_error("STATSD: No statsd sockets to listen to. statsd will be disabled.");
goto cleanup;
}
@@ -2536,7 +2546,8 @@ void *statsd_main(void *ptr) {
for(i = 0; i < statsd.threads ;i++) {
statsd.collection_threads_status[i].max_sockets = max_sockets / statsd.threads;
char tag[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STATSD_COLLECTOR[%d]", i + 1);
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STATSD_IN[%d]", i + 1);
+ netdata_spinlock_init(&statsd.collection_threads_status[i].spinlock);
netdata_thread_create(&statsd.collection_threads_status[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT, statsd_collector_thread, &statsd.collection_threads_status[i]);
}
@@ -2753,7 +2764,7 @@ void *statsd_main(void *ptr) {
usec_t step = statsd.update_every * USEC_PER_SEC;
heartbeat_t hb;
heartbeat_init(&hb);
- while(!netdata_exit) {
+ while(service_running(SERVICE_COLLECTORS)) {
worker_is_idle();
heartbeat_next(&hb, step);
@@ -2781,7 +2792,7 @@ void *statsd_main(void *ptr) {
worker_is_busy(WORKER_STATSD_FLUSH_STATS);
statsd_update_all_app_charts();
- if(unlikely(netdata_exit))
+ if(unlikely(!service_running(SERVICE_COLLECTORS)))
break;
if(global_statistics_enabled) {