diff options
Diffstat (limited to 'streaming/sender.c')
-rw-r--r-- | streaming/sender.c | 62 |
1 files changed, 48 insertions, 14 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 916d809a..72259c3a 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -13,24 +13,43 @@ void sender_start(struct sender_state *s) { buffer_flush(s->build); } +static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); + +#ifdef ENABLE_COMPRESSION +/* +* In case of stream compression buffer oveflow +* Inform the user through the error log file and +* deactivate compression by downgrading the stream protocol. +*/ +static inline void deactivate_compression(struct sender_state *s) +{ + error("STREAM_COMPRESSION: Deactivating compression to avoid stream corruption"); + default_compression_enabled = 0; + s->rrdpush_compression = 0; + s->version = STREAM_VERSION_CLABELS; + error("STREAM_COMPRESSION %s [send to %s]: Restarting connection without compression", s->host->hostname, s->connected_to); + rrdpush_sender_thread_close_socket(s->host); +} +#endif + // Collector thread finishing a transmission void sender_commit(struct sender_state *s) { char *src = (char *)buffer_tostring(s->host->sender->build); size_t src_len = s->host->sender->build->len; #ifdef ENABLE_COMPRESSION - do { - if (src && src_len) { - if (s->compressor && s->rrdpush_compression) { - src_len = s->compressor->compress(s->compressor, src, src_len, &src); - if (!src_len) { - error("Compression error - data discarded"); - break; - } + if (src && src_len) { + if (s->compressor && s->rrdpush_compression) { + src_len = s->compressor->compress(s->compressor, src, src_len, &src); + if (!src_len) { + deactivate_compression(s); + buffer_flush(s->build); + netdata_mutex_unlock(&s->mutex); + return; } - if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) - s->overflow = 1; } - } while (0); + if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) + s->overflow = 1; + } #else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) s->overflow = 1; @@ -252,6 +271,13 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po } #endif +#ifdef ENABLE_COMPRESSION +// Negotiate stream VERSION_CLABELS if stream compression is not supported +s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION)); +if(!s->rrdpush_compression) + s->version = STREAM_VERSION_CLABELS; +#endif //ENABLE_COMPRESSION + /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the version negotiation resulted in a high enough version. */ @@ -274,7 +300,10 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po "&ml_capable=%d" "&ml_enabled=%d" "&tags=%s" - "&ver=%u" + "&ver=%d" + "&NETDATA_INSTANCE_CLOUD_TYPE=%s" + "&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s" + "&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s" "&NETDATA_SYSTEM_OS_NAME=%s" "&NETDATA_SYSTEM_OS_ID=%s" "&NETDATA_SYSTEM_OS_ID_LIKE=%s" @@ -316,7 +345,10 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po , host->system_info->ml_capable , host->system_info->ml_enabled , (host->tags) ? host->tags : "" - , STREAMING_PROTOCOL_CURRENT_VERSION + , s->version + , (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : "" + , (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : "" + , (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : "" , se.os_name , se.os_id , (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : "" @@ -410,7 +442,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po s->version = version; #ifdef ENABLE_COMPRESSION - s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION)); + s->rrdpush_compression = (s->rrdpush_compression && (s->version >= STREAM_VERSION_COMPRESSION)); if(s->rrdpush_compression) { // parent supports compression @@ -420,6 +452,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po else { //parent does not support compression or has compression disabled debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname); + infoerr("Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname); s->version = STREAM_VERSION_CLABELS; } #endif //ENABLE_COMPRESSION @@ -664,6 +697,7 @@ void *rrdpush_sender_thread(void *ptr) { error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname); return NULL; } + s->version = STREAMING_PROTOCOL_CURRENT_VERSION; enum { Collector, |