summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/receiver.c28
-rw-r--r--streaming/rrdpush.c68
-rw-r--r--streaming/rrdpush.h2
-rw-r--r--streaming/sender.c42
4 files changed, 77 insertions, 63 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index d20658e65..0890ebbcd 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -217,9 +217,19 @@ static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t
// we need to receive data with LF to parse compression header
size_t ofs = 0;
int res = 0;
+ errno = 0;
while (ofs < size) {
do {
res = SSL_read(r->ssl.conn, buffer + ofs, 1);
+ // When either SSL_ERROR_SYSCALL (OpenSSL < 3.0) or SSL_ERROR_SSL(OpenSSL > 3.0) happens,
+ // the connection was lost https://www.openssl.org/docs/man3.0/man3/SSL_get_error.html,
+ // without the test we will have an infinite loop https://github.com/netdata/netdata/issues/13092
+ int local_ssl_err = SSL_get_error(r->ssl.conn, res);
+ if (local_ssl_err == SSL_ERROR_SYSCALL || local_ssl_err == SSL_ERROR_SSL) {
+ error("The SSL connection has error SSL_ERROR_SYSCALL(%d) and system is registering errno = %d",
+ local_ssl_err, errno);
+ return 1;
+ }
} while (res == 0);
if (res < 0)
@@ -507,6 +517,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rrdpush_api_key
, rrdpush_send_charts_matching
, rpt->system_info
+ , 0
);
if(!rpt->host) {
@@ -660,7 +671,14 @@ static int rrdpush_receive(struct receiver_state *rpt)
*/
// rpt->host->connected_senders++;
- rpt->host->labels.labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM;
+ if(rpt->stream_version > 0) {
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
+ }
+ else {
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
+ }
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
@@ -682,13 +700,15 @@ static int rrdpush_receive(struct receiver_state *rpt)
cd.version = rpt->stream_version;
-#if defined(ENABLE_NEW_CLOUD_PROTOCOL)
+#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
if (netdata_cloud_setting)
aclk_host_state_update(rpt->host, 1);
#endif
+ rrdcontext_host_child_connected(rpt->host);
+
size_t count = streaming_parser(rpt, &cd, fp);
log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname,
@@ -696,7 +716,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
rpt->client_port, count);
-#if defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ rrdcontext_host_child_disconnected(rpt->host);
+
+#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
if (netdata_cloud_setting)
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 77774d8d3..b73f24633 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -75,7 +75,8 @@ int rrdpush_init() {
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
- rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
+ rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time);
+
#ifdef ENABLE_COMPRESSION
default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
"enable compression", default_compression_enabled);
@@ -97,10 +98,10 @@ int rrdpush_init() {
}
}
- char *invalid_certificate = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", "no");
+ bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO);
- if ( !strcmp(invalid_certificate,"yes")){
- if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
+ if(invalid_certificate == CONFIG_BOOLEAN_YES){
+ if(netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
info("Netdata is configured to accept invalid SSL certificate.");
netdata_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
}
@@ -193,20 +194,15 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
}
// chart labels
+static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls);
+ return 1;
+}
void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) {
- struct label_index *labels_c = &st->state->labels;
- if (labels_c) {
- netdata_rwlock_rdlock(&host->labels.labels_rwlock);
- struct label *lbl = labels_c->head;
- while(lbl) {
- buffer_sprintf(host->sender->build,
- "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source);
-
- lbl = lbl->next;
- }
- if (labels_c->head)
+ if (st->state && st->state->chart_labels) {
+ if(rrdlabels_walkthrough_read(st->state->chart_labels, send_clabels_callback, host->sender->build) > 0)
buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n");
- netdata_rwlock_unlock(&host->labels.labels_rwlock);
}
}
@@ -277,11 +273,11 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
RRDSETVAR *rs;
for(rs = st->variables; rs ;rs = rs->next) {
if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) {
- calculated_number *value = (calculated_number *) rs->value;
+ NETDATA_DOUBLE *value = (NETDATA_DOUBLE *) rs->value;
buffer_sprintf(
host->sender->build
- , "VARIABLE CHART %s = " CALCULATED_NUMBER_FORMAT "\n"
+ , "VARIABLE CHART %s = " NETDATA_DOUBLE_FORMAT "\n"
, rs->variable
, *value
);
@@ -338,7 +334,7 @@ void rrdset_done_push(RRDSET *st) {
rrdpush_sender_thread_spawn(host);
// Handle non-connected case
- if(unlikely(!host->rrdpush_sender_connected)) {
+ if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) {
if(unlikely(!host->rrdpush_sender_error_shown))
error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname);
host->rrdpush_sender_error_shown = 1;
@@ -364,41 +360,30 @@ void rrdset_done_push(RRDSET *st) {
}
// labels
+static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
+ return 1;
+}
void rrdpush_send_labels(RRDHOST *host) {
- if (!host->labels.head || !(host->labels.labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels.labels_flag & LABEL_FLAG_STOP_STREAM))
+ if (!host->host_labels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP)))
return;
sender_start(host->sender);
- rrdhost_rdlock(host);
- netdata_rwlock_rdlock(&host->labels.labels_rwlock);
-
- struct label *label_i = host->labels.head;
- while(label_i) {
- buffer_sprintf(host->sender->build
- , "LABEL \"%s\" = %d %s\n"
- , label_i->key
- , (int)label_i->label_source
- , label_i->value);
- label_i = label_i->next;
- }
-
- buffer_sprintf(host->sender->build
- , "OVERWRITE %s\n", "labels");
-
- netdata_rwlock_unlock(&host->labels.labels_rwlock);
- rrdhost_unlock(host);
+ rrdlabels_walkthrough_read(host->host_labels, send_labels_callback, host->sender->build);
+ buffer_sprintf(host->sender->build, "OVERWRITE %s\n", "labels");
sender_commit(host->sender);
if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
- host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
}
void rrdpush_claimed_id(RRDHOST *host)
{
- if(unlikely(!host->rrdpush_send_enabled || !host->rrdpush_sender_connected))
+ if(unlikely(!host->rrdpush_send_enabled || !__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)))
return;
if(host->sender->version < STREAM_VERSION_CLAIM)
@@ -504,6 +489,9 @@ struct rrdpush_destinations *destinations_init(const char *dests) {
// The sender mutex guards thread creation, any spurious data is wiped on reconnection.
void rrdpush_sender_thread_stop(RRDHOST *host) {
+ if (!host->sender)
+ return;
+
netdata_mutex_lock(&host->sender->mutex);
netdata_thread_t thr = 0;
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 6efe8cd6f..1eb39cc6c 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -160,7 +160,7 @@ extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
extern unsigned int remote_clock_resync_iterations;
-extern void sender_init(struct sender_state *s, RRDHOST *parent);
+extern void sender_init(RRDHOST *parent);
extern struct rrdpush_destinations *destinations_init(const char *destinations);
void sender_start(struct sender_state *s);
void sender_commit(struct sender_state *s);
diff --git a/streaming/sender.c b/streaming/sender.c
index a95cc8673..c4836aeaf 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -80,7 +80,7 @@ void sender_commit(struct sender_state *s) {
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
- host->rrdpush_sender_connected = 0;
+ __atomic_clear(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
if(host->rrdpush_sender_socket != -1) {
close(host->rrdpush_sender_socket);
@@ -89,20 +89,20 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
}
static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) {
- calculated_number *value = (calculated_number *)rv->value;
+ NETDATA_DOUBLE *value = (NETDATA_DOUBLE *)rv->value;
buffer_sprintf(
host->sender->build
- , "VARIABLE HOST %s = " CALCULATED_NUMBER_FORMAT "\n"
+ , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
, rv->name
, *value
);
- debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " CALCULATED_NUMBER_FORMAT, rv->name, *value);
+ debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rv->name, *value);
}
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) {
- if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && host->rrdpush_sender_connected) {
+ if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && __atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)) {
sender_start(host->sender);
rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
sender_commit(host->sender);
@@ -172,8 +172,8 @@ static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
}
static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) {
- host->labels.labels_flag |= LABEL_FLAG_UPDATE_STREAM;
- host->labels.labels_flag &= ~LABEL_FLAG_STOP_STREAM;
+ rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_STOP);
}
void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
@@ -236,8 +236,8 @@ static inline long int parse_stream_version(RRDHOST *host, char *http)
answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
if (!answer) {
stream_version = 0;
- host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM;
- host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_STOP);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
}
else {
stream_version = parse_stream_version_for_errors(http);
@@ -563,7 +563,7 @@ static void attempt_to_connect(struct sender_state *state)
state->sent_bytes_on_this_connection = 0;
// let the data collection threads know we are ready
- state->host->rrdpush_sender_connected = 1;
+ __atomic_test_and_set(&state->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
}
else {
// increase the failed connections counter
@@ -724,17 +724,21 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
netdata_mutex_unlock(&host->sender->mutex);
}
-void sender_init(struct sender_state *s, RRDHOST *parent) {
- memset(s, 0, sizeof(*s));
- s->host = parent;
- s->buffer = cbuffer_new(1024, 1024*1024);
- s->build = buffer_create(1);
+void sender_init(RRDHOST *parent)
+{
+ if (parent->sender)
+ return;
+
+ parent->sender = callocz(1, sizeof(*parent->sender));
+ parent->sender->host = parent;
+ parent->sender->buffer = cbuffer_new(1024, 1024*1024);
+ parent->sender->build = buffer_create(1);
#ifdef ENABLE_COMPRESSION
- s->rrdpush_compression = default_compression_enabled;
+ parent->sender->rrdpush_compression = default_compression_enabled;
if (default_compression_enabled)
- s->compressor = create_compressor();
+ parent->sender->compressor = create_compressor();
#endif
- netdata_mutex_init(&s->mutex);
+ netdata_mutex_init(&parent->sender->mutex);
}
void *rrdpush_sender_thread(void *ptr) {
@@ -770,7 +774,7 @@ void *rrdpush_sender_thread(void *ptr) {
remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
// initialize rrdpush globals
- s->host->rrdpush_sender_connected = 0;
+ __atomic_clear(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
if(pipe(s->host->rrdpush_sender_pipe) == -1) {
error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname);
return NULL;