summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2022-08-12 07:26:11 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2022-08-12 07:26:11 +0000
commit3c315f0fff93aa072472abc10815963ac0035268 (patch)
treea95f6a96e0e7bd139c010f8dc60b40e5b3062a99 /streaming/rrdpush.c
parentAdding upstream version 1.35.1. (diff)
downloadnetdata-3c315f0fff93aa072472abc10815963ac0035268.tar.xz
netdata-3c315f0fff93aa072472abc10815963ac0035268.zip
Adding upstream version 1.36.0.upstream/1.36.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--streaming/rrdpush.c68
1 files changed, 28 insertions, 40 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 77774d8d3..b73f24633 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -75,7 +75,8 @@ int rrdpush_init() {
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
- rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
+ rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time);
+
#ifdef ENABLE_COMPRESSION
default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
"enable compression", default_compression_enabled);
@@ -97,10 +98,10 @@ int rrdpush_init() {
}
}
- char *invalid_certificate = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", "no");
+ bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO);
- if ( !strcmp(invalid_certificate,"yes")){
- if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
+ if(invalid_certificate == CONFIG_BOOLEAN_YES){
+ if(netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
info("Netdata is configured to accept invalid SSL certificate.");
netdata_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
}
@@ -193,20 +194,15 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
}
// chart labels
+static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls);
+ return 1;
+}
void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) {
- struct label_index *labels_c = &st->state->labels;
- if (labels_c) {
- netdata_rwlock_rdlock(&host->labels.labels_rwlock);
- struct label *lbl = labels_c->head;
- while(lbl) {
- buffer_sprintf(host->sender->build,
- "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source);
-
- lbl = lbl->next;
- }
- if (labels_c->head)
+ if (st->state && st->state->chart_labels) {
+ if(rrdlabels_walkthrough_read(st->state->chart_labels, send_clabels_callback, host->sender->build) > 0)
buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n");
- netdata_rwlock_unlock(&host->labels.labels_rwlock);
}
}
@@ -277,11 +273,11 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
RRDSETVAR *rs;
for(rs = st->variables; rs ;rs = rs->next) {
if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) {
- calculated_number *value = (calculated_number *) rs->value;
+ NETDATA_DOUBLE *value = (NETDATA_DOUBLE *) rs->value;
buffer_sprintf(
host->sender->build
- , "VARIABLE CHART %s = " CALCULATED_NUMBER_FORMAT "\n"
+ , "VARIABLE CHART %s = " NETDATA_DOUBLE_FORMAT "\n"
, rs->variable
, *value
);
@@ -338,7 +334,7 @@ void rrdset_done_push(RRDSET *st) {
rrdpush_sender_thread_spawn(host);
// Handle non-connected case
- if(unlikely(!host->rrdpush_sender_connected)) {
+ if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) {
if(unlikely(!host->rrdpush_sender_error_shown))
error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname);
host->rrdpush_sender_error_shown = 1;
@@ -364,41 +360,30 @@ void rrdset_done_push(RRDSET *st) {
}
// labels
+static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
+ return 1;
+}
void rrdpush_send_labels(RRDHOST *host) {
- if (!host->labels.head || !(host->labels.labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels.labels_flag & LABEL_FLAG_STOP_STREAM))
+ if (!host->host_labels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP)))
return;
sender_start(host->sender);
- rrdhost_rdlock(host);
- netdata_rwlock_rdlock(&host->labels.labels_rwlock);
-
- struct label *label_i = host->labels.head;
- while(label_i) {
- buffer_sprintf(host->sender->build
- , "LABEL \"%s\" = %d %s\n"
- , label_i->key
- , (int)label_i->label_source
- , label_i->value);
- label_i = label_i->next;
- }
-
- buffer_sprintf(host->sender->build
- , "OVERWRITE %s\n", "labels");
-
- netdata_rwlock_unlock(&host->labels.labels_rwlock);
- rrdhost_unlock(host);
+ rrdlabels_walkthrough_read(host->host_labels, send_labels_callback, host->sender->build);
+ buffer_sprintf(host->sender->build, "OVERWRITE %s\n", "labels");
sender_commit(host->sender);
if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
- host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
}
void rrdpush_claimed_id(RRDHOST *host)
{
- if(unlikely(!host->rrdpush_send_enabled || !host->rrdpush_sender_connected))
+ if(unlikely(!host->rrdpush_send_enabled || !__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)))
return;
if(host->sender->version < STREAM_VERSION_CLAIM)
@@ -504,6 +489,9 @@ struct rrdpush_destinations *destinations_init(const char *dests) {
// The sender mutex guards thread creation, any spurious data is wiped on reconnection.
void rrdpush_sender_thread_stop(RRDHOST *host) {
+ if (!host->sender)
+ return;
+
netdata_mutex_lock(&host->sender->mutex);
netdata_thread_t thr = 0;