diff options
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r-- | streaming/rrdpush.c | 68 |
1 files changed, 28 insertions, 40 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 77774d8d..b73f2463 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -75,7 +75,8 @@ int rrdpush_init() { default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", ""); default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", ""); default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*"); - rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time); + rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time); + #ifdef ENABLE_COMPRESSION default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enable compression", default_compression_enabled); @@ -97,10 +98,10 @@ int rrdpush_init() { } } - char *invalid_certificate = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", "no"); + bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO); - if ( !strcmp(invalid_certificate,"yes")){ - if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){ + if(invalid_certificate == CONFIG_BOOLEAN_YES){ + if(netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){ info("Netdata is configured to accept invalid SSL certificate."); netdata_validate_server = NETDATA_SSL_INVALID_CERTIFICATE; } @@ -193,20 +194,15 @@ static inline int need_to_send_chart_definition(RRDSET *st) { } // chart labels +static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + BUFFER *wb = (BUFFER *)data; + buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls); + return 1; +} void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) { - struct label_index *labels_c = &st->state->labels; - if (labels_c) { - netdata_rwlock_rdlock(&host->labels.labels_rwlock); - struct label *lbl = labels_c->head; - while(lbl) { - buffer_sprintf(host->sender->build, - "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source); - - lbl = lbl->next; - } - if (labels_c->head) + if (st->state && st->state->chart_labels) { + if(rrdlabels_walkthrough_read(st->state->chart_labels, send_clabels_callback, host->sender->build) > 0) buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n"); - netdata_rwlock_unlock(&host->labels.labels_rwlock); } } @@ -277,11 +273,11 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { RRDSETVAR *rs; for(rs = st->variables; rs ;rs = rs->next) { if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) { - calculated_number *value = (calculated_number *) rs->value; + NETDATA_DOUBLE *value = (NETDATA_DOUBLE *) rs->value; buffer_sprintf( host->sender->build - , "VARIABLE CHART %s = " CALCULATED_NUMBER_FORMAT "\n" + , "VARIABLE CHART %s = " NETDATA_DOUBLE_FORMAT "\n" , rs->variable , *value ); @@ -338,7 +334,7 @@ void rrdset_done_push(RRDSET *st) { rrdpush_sender_thread_spawn(host); // Handle non-connected case - if(unlikely(!host->rrdpush_sender_connected)) { + if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) { if(unlikely(!host->rrdpush_sender_error_shown)) error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname); host->rrdpush_sender_error_shown = 1; @@ -364,41 +360,30 @@ void rrdset_done_push(RRDSET *st) { } // labels +static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + BUFFER *wb = (BUFFER *)data; + buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value); + return 1; +} void rrdpush_send_labels(RRDHOST *host) { - if (!host->labels.head || !(host->labels.labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels.labels_flag & LABEL_FLAG_STOP_STREAM)) + if (!host->host_labels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP))) return; sender_start(host->sender); - rrdhost_rdlock(host); - netdata_rwlock_rdlock(&host->labels.labels_rwlock); - - struct label *label_i = host->labels.head; - while(label_i) { - buffer_sprintf(host->sender->build - , "LABEL \"%s\" = %d %s\n" - , label_i->key - , (int)label_i->label_source - , label_i->value); - label_i = label_i->next; - } - - buffer_sprintf(host->sender->build - , "OVERWRITE %s\n", "labels"); - - netdata_rwlock_unlock(&host->labels.labels_rwlock); - rrdhost_unlock(host); + rrdlabels_walkthrough_read(host->host_labels, send_labels_callback, host->sender->build); + buffer_sprintf(host->sender->build, "OVERWRITE %s\n", "labels"); sender_commit(host->sender); if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) error("STREAM %s [send]: cannot write to internal pipe", host->hostname); - host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; + rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); } void rrdpush_claimed_id(RRDHOST *host) { - if(unlikely(!host->rrdpush_send_enabled || !host->rrdpush_sender_connected)) + if(unlikely(!host->rrdpush_send_enabled || !__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) return; if(host->sender->version < STREAM_VERSION_CLAIM) @@ -504,6 +489,9 @@ struct rrdpush_destinations *destinations_init(const char *dests) { // The sender mutex guards thread creation, any spurious data is wiped on reconnection. void rrdpush_sender_thread_stop(RRDHOST *host) { + if (!host->sender) + return; + netdata_mutex_lock(&host->sender->mutex); netdata_thread_t thr = 0; |