summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c62
1 files changed, 48 insertions, 14 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 916d809a9..72259c3ab 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -13,24 +13,43 @@ void sender_start(struct sender_state *s) {
buffer_flush(s->build);
}
+static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
+
+#ifdef ENABLE_COMPRESSION
+/*
+* In case of stream compression buffer oveflow
+* Inform the user through the error log file and
+* deactivate compression by downgrading the stream protocol.
+*/
+static inline void deactivate_compression(struct sender_state *s)
+{
+ error("STREAM_COMPRESSION: Deactivating compression to avoid stream corruption");
+ default_compression_enabled = 0;
+ s->rrdpush_compression = 0;
+ s->version = STREAM_VERSION_CLABELS;
+ error("STREAM_COMPRESSION %s [send to %s]: Restarting connection without compression", s->host->hostname, s->connected_to);
+ rrdpush_sender_thread_close_socket(s->host);
+}
+#endif
+
// Collector thread finishing a transmission
void sender_commit(struct sender_state *s) {
char *src = (char *)buffer_tostring(s->host->sender->build);
size_t src_len = s->host->sender->build->len;
#ifdef ENABLE_COMPRESSION
- do {
- if (src && src_len) {
- if (s->compressor && s->rrdpush_compression) {
- src_len = s->compressor->compress(s->compressor, src, src_len, &src);
- if (!src_len) {
- error("Compression error - data discarded");
- break;
- }
+ if (src && src_len) {
+ if (s->compressor && s->rrdpush_compression) {
+ src_len = s->compressor->compress(s->compressor, src, src_len, &src);
+ if (!src_len) {
+ deactivate_compression(s);
+ buffer_flush(s->build);
+ netdata_mutex_unlock(&s->mutex);
+ return;
}
- if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
- s->overflow = 1;
}
- } while (0);
+ if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ s->overflow = 1;
+ }
#else
if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
s->overflow = 1;
@@ -252,6 +271,13 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
}
#endif
+#ifdef ENABLE_COMPRESSION
+// Negotiate stream VERSION_CLABELS if stream compression is not supported
+s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION));
+if(!s->rrdpush_compression)
+ s->version = STREAM_VERSION_CLABELS;
+#endif //ENABLE_COMPRESSION
+
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
*/
@@ -274,7 +300,10 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
"&ml_capable=%d"
"&ml_enabled=%d"
"&tags=%s"
- "&ver=%u"
+ "&ver=%d"
+ "&NETDATA_INSTANCE_CLOUD_TYPE=%s"
+ "&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s"
+ "&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s"
"&NETDATA_SYSTEM_OS_NAME=%s"
"&NETDATA_SYSTEM_OS_ID=%s"
"&NETDATA_SYSTEM_OS_ID_LIKE=%s"
@@ -316,7 +345,10 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
, host->system_info->ml_capable
, host->system_info->ml_enabled
, (host->tags) ? host->tags : ""
- , STREAMING_PROTOCOL_CURRENT_VERSION
+ , s->version
+ , (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
+ , (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
+ , (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : ""
, se.os_name
, se.os_id
, (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
@@ -410,7 +442,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
s->version = version;
#ifdef ENABLE_COMPRESSION
- s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION));
+ s->rrdpush_compression = (s->rrdpush_compression && (s->version >= STREAM_VERSION_COMPRESSION));
if(s->rrdpush_compression)
{
// parent supports compression
@@ -420,6 +452,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
else {
//parent does not support compression or has compression disabled
debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
+ infoerr("Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
s->version = STREAM_VERSION_CLABELS;
}
#endif //ENABLE_COMPRESSION
@@ -664,6 +697,7 @@ void *rrdpush_sender_thread(void *ptr) {
error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname);
return NULL;
}
+ s->version = STREAMING_PROTOCOL_CURRENT_VERSION;
enum {
Collector,