diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 28 |
1 files changed, 25 insertions, 3 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index d20658e6..0890ebbc 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -217,9 +217,19 @@ static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t // we need to receive data with LF to parse compression header size_t ofs = 0; int res = 0; + errno = 0; while (ofs < size) { do { res = SSL_read(r->ssl.conn, buffer + ofs, 1); + // When either SSL_ERROR_SYSCALL (OpenSSL < 3.0) or SSL_ERROR_SSL(OpenSSL > 3.0) happens, + // the connection was lost https://www.openssl.org/docs/man3.0/man3/SSL_get_error.html, + // without the test we will have an infinite loop https://github.com/netdata/netdata/issues/13092 + int local_ssl_err = SSL_get_error(r->ssl.conn, res); + if (local_ssl_err == SSL_ERROR_SYSCALL || local_ssl_err == SSL_ERROR_SSL) { + error("The SSL connection has error SSL_ERROR_SYSCALL(%d) and system is registering errno = %d", + local_ssl_err, errno); + return 1; + } } while (res == 0); if (res < 0) @@ -507,6 +517,7 @@ static int rrdpush_receive(struct receiver_state *rpt) , rrdpush_api_key , rrdpush_send_charts_matching , rpt->system_info + , 0 ); if(!rpt->host) { @@ -660,7 +671,14 @@ static int rrdpush_receive(struct receiver_state *rpt) */ // rpt->host->connected_senders++; - rpt->host->labels.labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM; + if(rpt->stream_version > 0) { + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); + } + else { + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); + } if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { @@ -682,13 +700,15 @@ static int rrdpush_receive(struct receiver_state *rpt) cd.version = rpt->stream_version; -#if defined(ENABLE_NEW_CLOUD_PROTOCOL) +#ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // new child connected if (netdata_cloud_setting) aclk_host_state_update(rpt->host, 1); #endif + rrdcontext_host_child_connected(rpt->host); + size_t count = streaming_parser(rpt, &cd, fp); log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname, @@ -696,7 +716,9 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); -#if defined(ENABLE_NEW_CLOUD_PROTOCOL) + rrdcontext_host_child_disconnected(rpt->host); + +#ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud // new child connected if (netdata_cloud_setting) |