summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2022-01-26 18:05:15 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2022-01-26 18:05:42 +0000
commit112b5b91647c3dea45cc1c9bc364df526c8012f1 (patch)
tree450af925135ec664c4310a1eb28b69481094ee2a /streaming/sender.c
parentReleasing debian version 1.32.1-2. (diff)
downloadnetdata-112b5b91647c3dea45cc1c9bc364df526c8012f1.tar.xz
netdata-112b5b91647c3dea45cc1c9bc364df526c8012f1.zip
Merging upstream version 1.33.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c120
1 files changed, 89 insertions, 31 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 0abfac180..916d809a9 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -15,9 +15,26 @@ void sender_start(struct sender_state *s) {
// Collector thread finishing a transmission
void sender_commit(struct sender_state *s) {
- if(cbuffer_add_unsafe(s->host->sender->buffer, buffer_tostring(s->host->sender->build),
- s->host->sender->build->len))
+ char *src = (char *)buffer_tostring(s->host->sender->build);
+ size_t src_len = s->host->sender->build->len;
+#ifdef ENABLE_COMPRESSION
+ do {
+ if (src && src_len) {
+ if (s->compressor && s->rrdpush_compression) {
+ src_len = s->compressor->compress(s->compressor, src, src_len, &src);
+ if (!src_len) {
+ error("Compression error - data discarded");
+ break;
+ }
+ }
+ if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ s->overflow = 1;
+ }
+ } while (0);
+#else
+ if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
s->overflow = 1;
+#endif
buffer_flush(s->build);
netdata_mutex_unlock(&s->mutex);
}
@@ -147,6 +164,35 @@ void rrdpush_clean_encoded(stream_encoded_t *se)
freez(se->kernel_version);
}
+static inline long int parse_stream_version(RRDHOST *host, char *http)
+{
+ long int stream_version = -1;
+ int answer = -1;
+ char *stream_version_start = strchr(http, '=');
+ if (stream_version_start) {
+ stream_version_start++;
+ stream_version = strtol(stream_version_start, NULL, 10);
+ answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(stream_version_start - http));
+ if (!answer) {
+ rrdpush_set_flags_to_newest_stream(host);
+ }
+ } else {
+ answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
+ if (!answer) {
+ stream_version = 1;
+ rrdpush_set_flags_to_newest_stream(host);
+ } else {
+ answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
+ if (!answer) {
+ stream_version = 0;
+ host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM;
+ host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ }
+ }
+ }
+ return stream_version;
+}
+
static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout,
struct sender_state *s) {
@@ -214,7 +260,21 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
char http[HTTP_HEADER_SIZE + 1];
int eol = snprintfz(http, HTTP_HEADER_SIZE,
- "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&abbrev_timezone=%s&utc_offset=%d&hops=%d&tags=%s&ver=%u"
+ "STREAM "
+ "key=%s"
+ "&hostname=%s"
+ "&registry_hostname=%s"
+ "&machine_guid=%s"
+ "&update_every=%d"
+ "&os=%s"
+ "&timezone=%s"
+ "&abbrev_timezone=%s"
+ "&utc_offset=%d"
+ "&hops=%d"
+ "&ml_capable=%d"
+ "&ml_enabled=%d"
+ "&tags=%s"
+ "&ver=%u"
"&NETDATA_SYSTEM_OS_NAME=%s"
"&NETDATA_SYSTEM_OS_ID=%s"
"&NETDATA_SYSTEM_OS_ID_LIKE=%s"
@@ -253,6 +313,8 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
, host->abbrev_timezone
, host->utc_offset
, host->system_info->hops + 1
+ , host->system_info->ml_capable
+ , host->system_info->ml_enabled
, (host->tags) ? host->tags : ""
, STREAMING_PROTOCOL_CURRENT_VERSION
, se.os_name
@@ -339,32 +401,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
http[received] = '\0';
debug(D_STREAM, "Response to sender from far end: %s", http);
- int answer = -1;
- char *version_start = strchr(http, '=');
- int32_t version = -1;
- if(version_start) {
- version_start++;
- version = (int32_t)strtol(version_start, NULL, 10);
- answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(version_start - http));
- if(!answer) {
- rrdpush_set_flags_to_newest_stream(host);
- }
- } else {
- answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
- if(!answer) {
- version = 1;
- rrdpush_set_flags_to_newest_stream(host);
- }
- else {
- answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
- if(!answer) {
- version = 0;
- host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM;
- host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
- }
- }
- }
-
+ int32_t version = (int32_t)parse_stream_version(host, http);
if(version == -1) {
error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, s->connected_to);
rrdpush_sender_thread_close_socket(host);
@@ -372,10 +409,26 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
}
s->version = version;
+#ifdef ENABLE_COMPRESSION
+ s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION));
+ if(s->rrdpush_compression)
+ {
+ // parent supports compression
+ if(s->compressor)
+ s->compressor->reset(s->compressor);
+ }
+ else {
+ //parent does not support compression or has compression disabled
+ debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
+ s->version = STREAM_VERSION_CLABELS;
+ }
+#endif //ENABLE_COMPRESSION
+
+
info("STREAM %s [send to %s]: established communication with a parent using protocol version %d - ready to send metrics..."
, host->hostname
, s->connected_to
- , version);
+ , s->version);
if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, s->connected_to);
@@ -565,6 +618,11 @@ void sender_init(struct sender_state *s, RRDHOST *parent) {
s->host = parent;
s->buffer = cbuffer_new(1024, 1024*1024);
s->build = buffer_create(1);
+#ifdef ENABLE_COMPRESSION
+ s->rrdpush_compression = default_compression_enabled;
+ if (default_compression_enabled)
+ s->compressor = create_compressor();
+#endif
netdata_mutex_init(&s->mutex);
}
@@ -630,7 +688,7 @@ void *rrdpush_sender_thread(void *ptr) {
if (s->version >= VERSION_GAP_FILLING) {
time_t now = now_realtime_sec();
sender_start(s);
- buffer_sprintf(s->build, "TIMESTAMP %ld", now);
+ buffer_sprintf(s->build, "TIMESTAMP %"PRId64"", (int64_t)now);
sender_commit(s);
}
rrdpush_claimed_id(s->host);