diff options
Diffstat (limited to '')
-rw-r--r-- | streaming/sender.c | 120 |
1 files changed, 89 insertions, 31 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 0abfac180..916d809a9 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -15,9 +15,26 @@ void sender_start(struct sender_state *s) { // Collector thread finishing a transmission void sender_commit(struct sender_state *s) { - if(cbuffer_add_unsafe(s->host->sender->buffer, buffer_tostring(s->host->sender->build), - s->host->sender->build->len)) + char *src = (char *)buffer_tostring(s->host->sender->build); + size_t src_len = s->host->sender->build->len; +#ifdef ENABLE_COMPRESSION + do { + if (src && src_len) { + if (s->compressor && s->rrdpush_compression) { + src_len = s->compressor->compress(s->compressor, src, src_len, &src); + if (!src_len) { + error("Compression error - data discarded"); + break; + } + } + if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) + s->overflow = 1; + } + } while (0); +#else + if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len)) s->overflow = 1; +#endif buffer_flush(s->build); netdata_mutex_unlock(&s->mutex); } @@ -147,6 +164,35 @@ void rrdpush_clean_encoded(stream_encoded_t *se) freez(se->kernel_version); } +static inline long int parse_stream_version(RRDHOST *host, char *http) +{ + long int stream_version = -1; + int answer = -1; + char *stream_version_start = strchr(http, '='); + if (stream_version_start) { + stream_version_start++; + stream_version = strtol(stream_version_start, NULL, 10); + answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(stream_version_start - http)); + if (!answer) { + rrdpush_set_flags_to_newest_stream(host); + } + } else { + answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2)); + if (!answer) { + stream_version = 1; + rrdpush_set_flags_to_newest_stream(host); + } else { + answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); + if (!answer) { + stream_version = 0; + host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM; + host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; + } + } + } + return stream_version; +} + static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) { @@ -214,7 +260,21 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po char http[HTTP_HEADER_SIZE + 1]; int eol = snprintfz(http, HTTP_HEADER_SIZE, - "STREAM key=%s&hostname=%s®istry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&abbrev_timezone=%s&utc_offset=%d&hops=%d&tags=%s&ver=%u" + "STREAM " + "key=%s" + "&hostname=%s" + "®istry_hostname=%s" + "&machine_guid=%s" + "&update_every=%d" + "&os=%s" + "&timezone=%s" + "&abbrev_timezone=%s" + "&utc_offset=%d" + "&hops=%d" + "&ml_capable=%d" + "&ml_enabled=%d" + "&tags=%s" + "&ver=%u" "&NETDATA_SYSTEM_OS_NAME=%s" "&NETDATA_SYSTEM_OS_ID=%s" "&NETDATA_SYSTEM_OS_ID_LIKE=%s" @@ -253,6 +313,8 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po , host->abbrev_timezone , host->utc_offset , host->system_info->hops + 1 + , host->system_info->ml_capable + , host->system_info->ml_enabled , (host->tags) ? host->tags : "" , STREAMING_PROTOCOL_CURRENT_VERSION , se.os_name @@ -339,32 +401,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po http[received] = '\0'; debug(D_STREAM, "Response to sender from far end: %s", http); - int answer = -1; - char *version_start = strchr(http, '='); - int32_t version = -1; - if(version_start) { - version_start++; - version = (int32_t)strtol(version_start, NULL, 10); - answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(version_start - http)); - if(!answer) { - rrdpush_set_flags_to_newest_stream(host); - } - } else { - answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2)); - if(!answer) { - version = 1; - rrdpush_set_flags_to_newest_stream(host); - } - else { - answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); - if(!answer) { - version = 0; - host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM; - host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; - } - } - } - + int32_t version = (int32_t)parse_stream_version(host, http); if(version == -1) { error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, s->connected_to); rrdpush_sender_thread_close_socket(host); @@ -372,10 +409,26 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po } s->version = version; +#ifdef ENABLE_COMPRESSION + s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION)); + if(s->rrdpush_compression) + { + // parent supports compression + if(s->compressor) + s->compressor->reset(s->compressor); + } + else { + //parent does not support compression or has compression disabled + debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname); + s->version = STREAM_VERSION_CLABELS; + } +#endif //ENABLE_COMPRESSION + + info("STREAM %s [send to %s]: established communication with a parent using protocol version %d - ready to send metrics..." , host->hostname , s->connected_to - , version); + , s->version); if(sock_setnonblock(host->rrdpush_sender_socket) < 0) error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, s->connected_to); @@ -565,6 +618,11 @@ void sender_init(struct sender_state *s, RRDHOST *parent) { s->host = parent; s->buffer = cbuffer_new(1024, 1024*1024); s->build = buffer_create(1); +#ifdef ENABLE_COMPRESSION + s->rrdpush_compression = default_compression_enabled; + if (default_compression_enabled) + s->compressor = create_compressor(); +#endif netdata_mutex_init(&s->mutex); } @@ -630,7 +688,7 @@ void *rrdpush_sender_thread(void *ptr) { if (s->version >= VERSION_GAP_FILLING) { time_t now = now_realtime_sec(); sender_start(s); - buffer_sprintf(s->build, "TIMESTAMP %ld", now); + buffer_sprintf(s->build, "TIMESTAMP %"PRId64"", (int64_t)now); sender_commit(s); } rrdpush_claimed_id(s->host); |