diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 28 | ||||
-rw-r--r-- | streaming/rrdpush.c | 68 | ||||
-rw-r--r-- | streaming/rrdpush.h | 2 | ||||
-rw-r--r-- | streaming/sender.c | 42 |
4 files changed, 77 insertions, 63 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index d20658e6..0890ebbc 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -217,9 +217,19 @@ static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t // we need to receive data with LF to parse compression header size_t ofs = 0; int res = 0; + errno = 0; while (ofs < size) { do { res = SSL_read(r->ssl.conn, buffer + ofs, 1); + // When either SSL_ERROR_SYSCALL (OpenSSL < 3.0) or SSL_ERROR_SSL(OpenSSL > 3.0) happens, + // the connection was lost https://www.openssl.org/docs/man3.0/man3/SSL_get_error.html, + // without the test we will have an infinite loop https://github.com/netdata/netdata/issues/13092 + int local_ssl_err = SSL_get_error(r->ssl.conn, res); + if (local_ssl_err == SSL_ERROR_SYSCALL || local_ssl_err == SSL_ERROR_SSL) { + error("The SSL connection has error SSL_ERROR_SYSCALL(%d) and system is registering errno = %d", + local_ssl_err, errno); + return 1; + } } while (res == 0); if (res < 0) @@ -507,6 +517,7 @@ static int rrdpush_receive(struct receiver_state *rpt) , rrdpush_api_key , rrdpush_send_charts_matching , rpt->system_info + , 0 ); if(!rpt->host) { @@ -660,7 +671,14 @@ static int rrdpush_receive(struct receiver_state *rpt) */ // rpt->host->connected_senders++; - rpt->host->labels.labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM; + if(rpt->stream_version > 0) { + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); + } + else { + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); + } if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { @@ -682,13 +700,15 @@ static int rrdpush_receive(struct receiver_state *rpt) cd.version = rpt->stream_version; -#if defined(ENABLE_NEW_CLOUD_PROTOCOL) +#ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // new child connected if (netdata_cloud_setting) aclk_host_state_update(rpt->host, 1); #endif + rrdcontext_host_child_connected(rpt->host); + size_t count = streaming_parser(rpt, &cd, fp); log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname, @@ -696,7 +716,9 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); -#if defined(ENABLE_NEW_CLOUD_PROTOCOL) + rrdcontext_host_child_disconnected(rpt->host); + +#ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // new child connected if (netdata_cloud_setting) diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 77774d8d..b73f2463 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -75,7 +75,8 @@ int rrdpush_init() { default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", ""); default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", ""); default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*"); - rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time); + rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time); + #ifdef ENABLE_COMPRESSION default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enable compression", default_compression_enabled); @@ -97,10 +98,10 @@ int rrdpush_init() { } } - char *invalid_certificate = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", "no"); + bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO); - if ( !strcmp(invalid_certificate,"yes")){ - if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){ + if(invalid_certificate == CONFIG_BOOLEAN_YES){ + if(netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){ info("Netdata is configured to accept invalid SSL certificate."); netdata_validate_server = NETDATA_SSL_INVALID_CERTIFICATE; } @@ -193,20 +194,15 @@ static inline int need_to_send_chart_definition(RRDSET *st) { } // chart labels +static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + BUFFER *wb = (BUFFER *)data; + buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls); + return 1; +} void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) { - struct label_index *labels_c = &st->state->labels; - if (labels_c) { - netdata_rwlock_rdlock(&host->labels.labels_rwlock); - struct label *lbl = labels_c->head; - while(lbl) { - buffer_sprintf(host->sender->build, - "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source); - - lbl = lbl->next; - } - if (labels_c->head) + if (st->state && st->state->chart_labels) { + if(rrdlabels_walkthrough_read(st->state->chart_labels, send_clabels_callback, host->sender->build) > 0) buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n"); - netdata_rwlock_unlock(&host->labels.labels_rwlock); } } @@ -277,11 +273,11 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { RRDSETVAR *rs; for(rs = st->variables; rs ;rs = rs->next) { if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) { - calculated_number *value = (calculated_number *) rs->value; + NETDATA_DOUBLE *value = (NETDATA_DOUBLE *) rs->value; buffer_sprintf( host->sender->build - , "VARIABLE CHART %s = " CALCULATED_NUMBER_FORMAT "\n" + , "VARIABLE CHART %s = " NETDATA_DOUBLE_FORMAT "\n" , rs->variable , *value ); @@ -338,7 +334,7 @@ void rrdset_done_push(RRDSET *st) { rrdpush_sender_thread_spawn(host); // Handle non-connected case - if(unlikely(!host->rrdpush_sender_connected)) { + if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) { if(unlikely(!host->rrdpush_sender_error_shown)) error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname); host->rrdpush_sender_error_shown = 1; @@ -364,41 +360,30 @@ void rrdset_done_push(RRDSET *st) { } // labels +static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + BUFFER *wb = (BUFFER *)data; + buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value); + return 1; +} void rrdpush_send_labels(RRDHOST *host) { - if (!host->labels.head || !(host->labels.labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels.labels_flag & LABEL_FLAG_STOP_STREAM)) + if (!host->host_labels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP))) return; sender_start(host->sender); - rrdhost_rdlock(host); - netdata_rwlock_rdlock(&host->labels.labels_rwlock); - - struct label *label_i = host->labels.head; - while(label_i) { - buffer_sprintf(host->sender->build - , "LABEL \"%s\" = %d %s\n" - , label_i->key - , (int)label_i->label_source - , label_i->value); - label_i = label_i->next; - } - - buffer_sprintf(host->sender->build - , "OVERWRITE %s\n", "labels"); - - netdata_rwlock_unlock(&host->labels.labels_rwlock); - rrdhost_unlock(host); + rrdlabels_walkthrough_read(host->host_labels, send_labels_callback, host->sender->build); + buffer_sprintf(host->sender->build, "OVERWRITE %s\n", "labels"); sender_commit(host->sender); if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) error("STREAM %s [send]: cannot write to internal pipe", host->hostname); - host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; + rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); } void rrdpush_claimed_id(RRDHOST *host) { - if(unlikely(!host->rrdpush_send_enabled || !host->rrdpush_sender_connected)) + if(unlikely(!host->rrdpush_send_enabled || !__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) return; if(host->sender->version < STREAM_VERSION_CLAIM) @@ -504,6 +489,9 @@ struct rrdpush_destinations *destinations_init(const char *dests) { // The sender mutex guards thread creation, any spurious data is wiped on reconnection. void rrdpush_sender_thread_stop(RRDHOST *host) { + if (!host->sender) + return; + netdata_mutex_lock(&host->sender->mutex); netdata_thread_t thr = 0; diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 6efe8cd6..1eb39cc6 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -160,7 +160,7 @@ extern char *default_rrdpush_api_key; extern char *default_rrdpush_send_charts_matching; extern unsigned int remote_clock_resync_iterations; -extern void sender_init(struct sender_state *s, RRDHOST *parent); +extern void sender_init(RRDHOST *parent); extern struct rrdpush_destinations *destinations_init(const char *destinations); void sender_start(struct sender_state *s); void sender_commit(struct sender_state *s); diff --git a/streaming/sender.c b/streaming/sender.c index a95cc867..c4836aea 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -80,7 +80,7 @@ void sender_commit(struct sender_state *s) { static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { - host->rrdpush_sender_connected = 0; + __atomic_clear(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST); if(host->rrdpush_sender_socket != -1) { close(host->rrdpush_sender_socket); @@ -89,20 +89,20 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { } static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) { - calculated_number *value = (calculated_number *)rv->value; + NETDATA_DOUBLE *value = (NETDATA_DOUBLE *)rv->value; buffer_sprintf( host->sender->build - , "VARIABLE HOST %s = " CALCULATED_NUMBER_FORMAT "\n" + , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n" , rv->name , *value ); - debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " CALCULATED_NUMBER_FORMAT, rv->name, *value); + debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rv->name, *value); } void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) { - if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && host->rrdpush_sender_connected) { + if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && __atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)) { sender_start(host->sender); rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv); sender_commit(host->sender); @@ -172,8 +172,8 @@ static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) { } static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) { - host->labels.labels_flag |= LABEL_FLAG_UPDATE_STREAM; - host->labels.labels_flag &= ~LABEL_FLAG_STOP_STREAM; + rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); + rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_STOP); } void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) @@ -236,8 +236,8 @@ static inline long int parse_stream_version(RRDHOST *host, char *http) answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); if (!answer) { stream_version = 0; - host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM; - host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; + rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_STOP); + rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); } else { stream_version = parse_stream_version_for_errors(http); @@ -563,7 +563,7 @@ static void attempt_to_connect(struct sender_state *state) state->sent_bytes_on_this_connection = 0; // let the data collection threads know we are ready - state->host->rrdpush_sender_connected = 1; + __atomic_test_and_set(&state->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST); } else { // increase the failed connections counter @@ -724,17 +724,21 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { netdata_mutex_unlock(&host->sender->mutex); } -void sender_init(struct sender_state *s, RRDHOST *parent) { - memset(s, 0, sizeof(*s)); - s->host = parent; - s->buffer = cbuffer_new(1024, 1024*1024); - s->build = buffer_create(1); +void sender_init(RRDHOST *parent) +{ + if (parent->sender) + return; + + parent->sender = callocz(1, sizeof(*parent->sender)); + parent->sender->host = parent; + parent->sender->buffer = cbuffer_new(1024, 1024*1024); + parent->sender->build = buffer_create(1); #ifdef ENABLE_COMPRESSION - s->rrdpush_compression = default_compression_enabled; + parent->sender->rrdpush_compression = default_compression_enabled; if (default_compression_enabled) - s->compressor = create_compressor(); + parent->sender->compressor = create_compressor(); #endif - netdata_mutex_init(&s->mutex); + netdata_mutex_init(&parent->sender->mutex); } void *rrdpush_sender_thread(void *ptr) { @@ -770,7 +774,7 @@ void *rrdpush_sender_thread(void *ptr) { remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING // initialize rrdpush globals - s->host->rrdpush_sender_connected = 0; + __atomic_clear(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST); if(pipe(s->host->rrdpush_sender_pipe) == -1) { error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname); return NULL; |