summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--streaming/receiver.c28
1 files changed, 25 insertions, 3 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index d20658e6..0890ebbc 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -217,9 +217,19 @@ static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t
// we need to receive data with LF to parse compression header
size_t ofs = 0;
int res = 0;
+ errno = 0;
while (ofs < size) {
do {
res = SSL_read(r->ssl.conn, buffer + ofs, 1);
+ // When either SSL_ERROR_SYSCALL (OpenSSL < 3.0) or SSL_ERROR_SSL(OpenSSL > 3.0) happens,
+ // the connection was lost https://www.openssl.org/docs/man3.0/man3/SSL_get_error.html,
+ // without the test we will have an infinite loop https://github.com/netdata/netdata/issues/13092
+ int local_ssl_err = SSL_get_error(r->ssl.conn, res);
+ if (local_ssl_err == SSL_ERROR_SYSCALL || local_ssl_err == SSL_ERROR_SSL) {
+ error("The SSL connection has error SSL_ERROR_SYSCALL(%d) and system is registering errno = %d",
+ local_ssl_err, errno);
+ return 1;
+ }
} while (res == 0);
if (res < 0)
@@ -507,6 +517,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rrdpush_api_key
, rrdpush_send_charts_matching
, rpt->system_info
+ , 0
);
if(!rpt->host) {
@@ -660,7 +671,14 @@ static int rrdpush_receive(struct receiver_state *rpt)
*/
// rpt->host->connected_senders++;
- rpt->host->labels.labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM;
+ if(rpt->stream_version > 0) {
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
+ }
+ else {
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
+ }
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
@@ -682,13 +700,15 @@ static int rrdpush_receive(struct receiver_state *rpt)
cd.version = rpt->stream_version;
-#if defined(ENABLE_NEW_CLOUD_PROTOCOL)
+#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
if (netdata_cloud_setting)
aclk_host_state_update(rpt->host, 1);
#endif
+ rrdcontext_host_child_connected(rpt->host);
+
size_t count = streaming_parser(rpt, &cd, fp);
log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname,
@@ -696,7 +716,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
rpt->client_port, count);
-#if defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ rrdcontext_host_child_disconnected(rpt->host);
+
+#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
if (netdata_cloud_setting)