summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c42
1 files changed, 23 insertions, 19 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index a95cc8673..c4836aeaf 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -80,7 +80,7 @@ void sender_commit(struct sender_state *s) {
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
- host->rrdpush_sender_connected = 0;
+ __atomic_clear(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
if(host->rrdpush_sender_socket != -1) {
close(host->rrdpush_sender_socket);
@@ -89,20 +89,20 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
}
static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) {
- calculated_number *value = (calculated_number *)rv->value;
+ NETDATA_DOUBLE *value = (NETDATA_DOUBLE *)rv->value;
buffer_sprintf(
host->sender->build
- , "VARIABLE HOST %s = " CALCULATED_NUMBER_FORMAT "\n"
+ , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
, rv->name
, *value
);
- debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " CALCULATED_NUMBER_FORMAT, rv->name, *value);
+ debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rv->name, *value);
}
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) {
- if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && host->rrdpush_sender_connected) {
+ if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && __atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)) {
sender_start(host->sender);
rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
sender_commit(host->sender);
@@ -172,8 +172,8 @@ static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
}
static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) {
- host->labels.labels_flag |= LABEL_FLAG_UPDATE_STREAM;
- host->labels.labels_flag &= ~LABEL_FLAG_STOP_STREAM;
+ rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_STOP);
}
void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
@@ -236,8 +236,8 @@ static inline long int parse_stream_version(RRDHOST *host, char *http)
answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
if (!answer) {
stream_version = 0;
- host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM;
- host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_STOP);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
}
else {
stream_version = parse_stream_version_for_errors(http);
@@ -563,7 +563,7 @@ static void attempt_to_connect(struct sender_state *state)
state->sent_bytes_on_this_connection = 0;
// let the data collection threads know we are ready
- state->host->rrdpush_sender_connected = 1;
+ __atomic_test_and_set(&state->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
}
else {
// increase the failed connections counter
@@ -724,17 +724,21 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
netdata_mutex_unlock(&host->sender->mutex);
}
-void sender_init(struct sender_state *s, RRDHOST *parent) {
- memset(s, 0, sizeof(*s));
- s->host = parent;
- s->buffer = cbuffer_new(1024, 1024*1024);
- s->build = buffer_create(1);
+void sender_init(RRDHOST *parent)
+{
+ if (parent->sender)
+ return;
+
+ parent->sender = callocz(1, sizeof(*parent->sender));
+ parent->sender->host = parent;
+ parent->sender->buffer = cbuffer_new(1024, 1024*1024);
+ parent->sender->build = buffer_create(1);
#ifdef ENABLE_COMPRESSION
- s->rrdpush_compression = default_compression_enabled;
+ parent->sender->rrdpush_compression = default_compression_enabled;
if (default_compression_enabled)
- s->compressor = create_compressor();
+ parent->sender->compressor = create_compressor();
#endif
- netdata_mutex_init(&s->mutex);
+ netdata_mutex_init(&parent->sender->mutex);
}
void *rrdpush_sender_thread(void *ptr) {
@@ -770,7 +774,7 @@ void *rrdpush_sender_thread(void *ptr) {
remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
// initialize rrdpush globals
- s->host->rrdpush_sender_connected = 0;
+ __atomic_clear(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
if(pipe(s->host->rrdpush_sender_pipe) == -1) {
error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname);
return NULL;