diff options
Diffstat (limited to 'streaming/sender.c')
-rw-r--r-- | streaming/sender.c | 42 |
1 files changed, 23 insertions, 19 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index a95cc8673..c4836aeaf 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -80,7 +80,7 @@ void sender_commit(struct sender_state *s) { static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { - host->rrdpush_sender_connected = 0; + __atomic_clear(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST); if(host->rrdpush_sender_socket != -1) { close(host->rrdpush_sender_socket); @@ -89,20 +89,20 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { } static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) { - calculated_number *value = (calculated_number *)rv->value; + NETDATA_DOUBLE *value = (NETDATA_DOUBLE *)rv->value; buffer_sprintf( host->sender->build - , "VARIABLE HOST %s = " CALCULATED_NUMBER_FORMAT "\n" + , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n" , rv->name , *value ); - debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " CALCULATED_NUMBER_FORMAT, rv->name, *value); + debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rv->name, *value); } void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) { - if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && host->rrdpush_sender_connected) { + if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && __atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)) { sender_start(host->sender); rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv); sender_commit(host->sender); @@ -172,8 +172,8 @@ static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) { } static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) { - host->labels.labels_flag |= LABEL_FLAG_UPDATE_STREAM; - host->labels.labels_flag &= ~LABEL_FLAG_STOP_STREAM; + rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); + rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_STOP); } void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) @@ -236,8 +236,8 @@ static inline long int parse_stream_version(RRDHOST *host, char *http) answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); if (!answer) { stream_version = 0; - host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM; - host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; + rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_STOP); + rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); } else { stream_version = parse_stream_version_for_errors(http); @@ -563,7 +563,7 @@ static void attempt_to_connect(struct sender_state *state) state->sent_bytes_on_this_connection = 0; // let the data collection threads know we are ready - state->host->rrdpush_sender_connected = 1; + __atomic_test_and_set(&state->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST); } else { // increase the failed connections counter @@ -724,17 +724,21 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { netdata_mutex_unlock(&host->sender->mutex); } -void sender_init(struct sender_state *s, RRDHOST *parent) { - memset(s, 0, sizeof(*s)); - s->host = parent; - s->buffer = cbuffer_new(1024, 1024*1024); - s->build = buffer_create(1); +void sender_init(RRDHOST *parent) +{ + if (parent->sender) + return; + + parent->sender = callocz(1, sizeof(*parent->sender)); + parent->sender->host = parent; + parent->sender->buffer = cbuffer_new(1024, 1024*1024); + parent->sender->build = buffer_create(1); #ifdef ENABLE_COMPRESSION - s->rrdpush_compression = default_compression_enabled; + parent->sender->rrdpush_compression = default_compression_enabled; if (default_compression_enabled) - s->compressor = create_compressor(); + parent->sender->compressor = create_compressor(); #endif - netdata_mutex_init(&s->mutex); + netdata_mutex_init(&parent->sender->mutex); } void *rrdpush_sender_thread(void *ptr) { @@ -770,7 +774,7 @@ void *rrdpush_sender_thread(void *ptr) { remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING // initialize rrdpush globals - s->host->rrdpush_sender_connected = 0; + __atomic_clear(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST); if(pipe(s->host->rrdpush_sender_pipe) == -1) { error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname); return NULL; |