summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:22 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:22 +0000
commitc21c3b0befeb46a51b6bf3758ffa30813bea0ff0 (patch)
tree9754ff1ca740f6346cf8483ec915d4054bc5da2d /streaming
parentAdding upstream version 1.43.2. (diff)
downloadnetdata-upstream/1.44.3.tar.xz
netdata-upstream/1.44.3.zip
Adding upstream version 1.44.3.upstream/1.44.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/README.md1
-rw-r--r--streaming/common.h9
-rw-r--r--streaming/compression.c780
-rw-r--r--streaming/compression.h175
-rw-r--r--streaming/compression_brotli.c142
-rw-r--r--streaming/compression_brotli.h15
-rw-r--r--streaming/compression_gzip.c164
-rw-r--r--streaming/compression_gzip.h15
-rw-r--r--streaming/compression_lz4.c143
-rw-r--r--streaming/compression_lz4.h19
-rw-r--r--streaming/compression_zstd.c163
-rw-r--r--streaming/compression_zstd.h19
-rw-r--r--streaming/receiver.c420
-rw-r--r--streaming/replication.c115
-rw-r--r--streaming/rrdpush.c419
-rw-r--r--streaming/rrdpush.h224
-rw-r--r--streaming/sender.c581
-rw-r--r--streaming/stream.conf13
18 files changed, 2625 insertions, 792 deletions
diff --git a/streaming/README.md b/streaming/README.md
index a27167bc1..03de090e0 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -44,6 +44,7 @@ node**. This file is automatically generated by Netdata the first time it is sta
| `buffer size bytes` | `10485760` | The size of the buffer to use when sending metrics. The default `10485760` equals a buffer of 10MB, which is good for 60 seconds of data. Increase this if you expect latencies higher than that. The buffer is flushed on reconnect. |
| `reconnect delay seconds` | `5` | How long to wait until retrying to connect to the parent node. |
| `initial clock resync iterations` | `60` | Sync the clock of charts for how many seconds when starting. |
+| `parent using h2o` | `no` | Set to yes if you are connecting to parent trough it's h2o webserver/port. Currently there is no reason to set this to `yes` unless you are testing the new h2o based netdata webserver. When production ready this will be set to `yes` as default. |
### `[API_KEY]` and `[MACHINE_GUID]` sections
diff --git a/streaming/common.h b/streaming/common.h
new file mode 100644
index 000000000..b7292f4d0
--- /dev/null
+++ b/streaming/common.h
@@ -0,0 +1,9 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef STREAMING_COMMON_H
+#define STREAMING_COMMON_H
+
+#define NETDATA_STREAM_URL "/stream"
+#define NETDATA_STREAM_PROTO_NAME "netdata_stream/2.0"
+
+#endif /* STREAMING_COMMON_H */
diff --git a/streaming/compression.c b/streaming/compression.c
index 6d4a128b0..a94c8a0a6 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -1,181 +1,707 @@
-#include "rrdpush.h"
+// SPDX-License-Identifier: GPL-3.0-or-later
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-#include "lz4.h"
+#include "compression.h"
-#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
+#include "compression_gzip.h"
-/*
- * Reset compressor state for a new stream
- */
-void rrdpush_compressor_reset(struct compressor_state *state) {
- if(!state->initialized) {
- state->initialized = true;
+#ifdef ENABLE_LZ4
+#include "compression_lz4.h"
+#endif
+
+#ifdef ENABLE_ZSTD
+#include "compression_zstd.h"
+#endif
- state->stream.lz4_stream = LZ4_createStream();
- state->stream.input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2);
- state->stream.input_ring_buffer = callocz(1, state->stream.input_ring_buffer_size);
- state->compression_result_buffer_size = 0;
+#ifdef ENABLE_BROTLI
+#include "compression_brotli.h"
+#endif
+
+int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX] = {
+ [COMPRESSION_ALGORITHM_NONE] = 0,
+ [COMPRESSION_ALGORITHM_ZSTD] = 3, // 1 (faster) - 22 (smaller)
+ [COMPRESSION_ALGORITHM_LZ4] = 1, // 1 (smaller) - 9 (faster)
+ [COMPRESSION_ALGORITHM_BROTLI] = 3, // 0 (faster) - 11 (smaller)
+ [COMPRESSION_ALGORITHM_GZIP] = 1, // 1 (faster) - 9 (smaller)
+};
+
+void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order) {
+ // empty all slots
+ for(size_t i = 0; i < COMPRESSION_ALGORITHM_MAX ;i++)
+ rpt->config.compression_priorities[i] = STREAM_CAP_NONE;
+
+ char *s = strdupz(order);
+
+ char *words[COMPRESSION_ALGORITHM_MAX + 100] = { NULL };
+ size_t num_words = quoted_strings_splitter_pluginsd(s, words, COMPRESSION_ALGORITHM_MAX + 100);
+ size_t slot = 0;
+ STREAM_CAPABILITIES added = STREAM_CAP_NONE;
+ for(size_t i = 0; i < num_words && slot < COMPRESSION_ALGORITHM_MAX ;i++) {
+ if((STREAM_CAP_ZSTD_AVAILABLE) && strcasecmp(words[i], "zstd") == 0 && !(added & STREAM_CAP_ZSTD)) {
+ rpt->config.compression_priorities[slot++] = STREAM_CAP_ZSTD;
+ added |= STREAM_CAP_ZSTD;
+ }
+ else if((STREAM_CAP_LZ4_AVAILABLE) && strcasecmp(words[i], "lz4") == 0 && !(added & STREAM_CAP_LZ4)) {
+ rpt->config.compression_priorities[slot++] = STREAM_CAP_LZ4;
+ added |= STREAM_CAP_LZ4;
+ }
+ else if((STREAM_CAP_BROTLI_AVAILABLE) && strcasecmp(words[i], "brotli") == 0 && !(added & STREAM_CAP_BROTLI)) {
+ rpt->config.compression_priorities[slot++] = STREAM_CAP_BROTLI;
+ added |= STREAM_CAP_BROTLI;
+ }
+ else if(strcasecmp(words[i], "gzip") == 0 && !(added & STREAM_CAP_GZIP)) {
+ rpt->config.compression_priorities[slot++] = STREAM_CAP_GZIP;
+ added |= STREAM_CAP_GZIP;
+ }
}
- LZ4_resetStream_fast(state->stream.lz4_stream);
+ freez(s);
+
+ // make sure all participate
+ if((STREAM_CAP_ZSTD_AVAILABLE) && slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_ZSTD))
+ rpt->config.compression_priorities[slot++] = STREAM_CAP_ZSTD;
+ if((STREAM_CAP_LZ4_AVAILABLE) && slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_LZ4))
+ rpt->config.compression_priorities[slot++] = STREAM_CAP_LZ4;
+ if((STREAM_CAP_BROTLI_AVAILABLE) && slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_BROTLI))
+ rpt->config.compression_priorities[slot++] = STREAM_CAP_BROTLI;
+ if(slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_GZIP))
+ rpt->config.compression_priorities[slot++] = STREAM_CAP_GZIP;
+}
+
+void rrdpush_select_receiver_compression_algorithm(struct receiver_state *rpt) {
+ if (!rpt->config.rrdpush_compression)
+ rpt->capabilities &= ~STREAM_CAP_COMPRESSIONS_AVAILABLE;
+
+ // select the right compression before sending our capabilities to the child
+ if(stream_has_more_than_one_capability_of(rpt->capabilities, STREAM_CAP_COMPRESSIONS_AVAILABLE)) {
+ STREAM_CAPABILITIES compressions = rpt->capabilities & STREAM_CAP_COMPRESSIONS_AVAILABLE;
+ for(int i = 0; i < COMPRESSION_ALGORITHM_MAX; i++) {
+ STREAM_CAPABILITIES c = rpt->config.compression_priorities[i];
+
+ if(!(c & STREAM_CAP_COMPRESSIONS_AVAILABLE))
+ continue;
+
+ if(compressions & c) {
+ STREAM_CAPABILITIES exclude = compressions;
+ exclude &= ~c;
- state->stream.input_ring_buffer_pos = 0;
+ rpt->capabilities &= ~exclude;
+ break;
+ }
+ }
+ }
}
-/*
- * Destroy compressor state and all related data
- */
-void rrdpush_compressor_destroy(struct compressor_state *state) {
- if (state->stream.lz4_stream) {
- LZ4_freeStream(state->stream.lz4_stream);
- state->stream.lz4_stream = NULL;
+bool rrdpush_compression_initialize(struct sender_state *s) {
+ rrdpush_compressor_destroy(&s->compressor);
+
+ // IMPORTANT
+ // KEEP THE SAME ORDER IN DECOMPRESSION
+
+ if(stream_has_capability(s, STREAM_CAP_ZSTD))
+ s->compressor.algorithm = COMPRESSION_ALGORITHM_ZSTD;
+ else if(stream_has_capability(s, STREAM_CAP_LZ4))
+ s->compressor.algorithm = COMPRESSION_ALGORITHM_LZ4;
+ else if(stream_has_capability(s, STREAM_CAP_BROTLI))
+ s->compressor.algorithm = COMPRESSION_ALGORITHM_BROTLI;
+ else if(stream_has_capability(s, STREAM_CAP_GZIP))
+ s->compressor.algorithm = COMPRESSION_ALGORITHM_GZIP;
+ else
+ s->compressor.algorithm = COMPRESSION_ALGORITHM_NONE;
+
+ if(s->compressor.algorithm != COMPRESSION_ALGORITHM_NONE) {
+ s->compressor.level = rrdpush_compression_levels[s->compressor.algorithm];
+ rrdpush_compressor_init(&s->compressor);
+ return true;
}
- freez(state->stream.input_ring_buffer);
- state->stream.input_ring_buffer = NULL;
+ return false;
+}
- freez(state->compression_result_buffer);
- state->compression_result_buffer = NULL;
+bool rrdpush_decompression_initialize(struct receiver_state *rpt) {
+ rrdpush_decompressor_destroy(&rpt->decompressor);
+
+ // IMPORTANT
+ // KEEP THE SAME ORDER IN COMPRESSION
+
+ if(stream_has_capability(rpt, STREAM_CAP_ZSTD))
+ rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_ZSTD;
+ else if(stream_has_capability(rpt, STREAM_CAP_LZ4))
+ rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_LZ4;
+ else if(stream_has_capability(rpt, STREAM_CAP_BROTLI))
+ rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_BROTLI;
+ else if(stream_has_capability(rpt, STREAM_CAP_GZIP))
+ rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_GZIP;
+ else
+ rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_NONE;
+
+ if(rpt->decompressor.algorithm != COMPRESSION_ALGORITHM_NONE) {
+ rrdpush_decompressor_init(&rpt->decompressor);
+ return true;
+ }
- state->initialized = false;
+ return false;
}
/*
- * Compress the given block of data
- * Compressed data will remain in the internal buffer until the next invocation
- * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
- * or 0 in case of error
- */
-size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out) {
- if(unlikely(!state || !size || !out))
- return 0;
-
- if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) {
- netdata_log_error("RRDPUSH COMPRESS: Compression Failed - Message size %lu above compression buffer limit: %d",
- (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE);
- return 0;
+* In case of stream compression buffer overflow
+* Inform the user through the error log file and
+* deactivate compression by downgrading the stream protocol.
+*/
+void rrdpush_compression_deactivate(struct sender_state *s) {
+ switch(s->compressor.algorithm) {
+ case COMPRESSION_ALGORITHM_MAX:
+ case COMPRESSION_ALGORITHM_NONE:
+ netdata_log_error("STREAM_COMPRESSION: compression error on 'host:%s' without any compression enabled. Ignoring error.",
+ rrdhost_hostname(s->host));
+ break;
+
+ case COMPRESSION_ALGORITHM_GZIP:
+ netdata_log_error("STREAM_COMPRESSION: GZIP compression error on 'host:%s'. Disabling GZIP for this node.",
+ rrdhost_hostname(s->host));
+ s->disabled_capabilities |= STREAM_CAP_GZIP;
+ break;
+
+ case COMPRESSION_ALGORITHM_LZ4:
+ netdata_log_error("STREAM_COMPRESSION: LZ4 compression error on 'host:%s'. Disabling ZSTD for this node.",
+ rrdhost_hostname(s->host));
+ s->disabled_capabilities |= STREAM_CAP_LZ4;
+ break;
+
+ case COMPRESSION_ALGORITHM_ZSTD:
+ netdata_log_error("STREAM_COMPRESSION: ZSTD compression error on 'host:%s'. Disabling ZSTD for this node.",
+ rrdhost_hostname(s->host));
+ s->disabled_capabilities |= STREAM_CAP_ZSTD;
+ break;
+
+ case COMPRESSION_ALGORITHM_BROTLI:
+ netdata_log_error("STREAM_COMPRESSION: BROTLI compression error on 'host:%s'. Disabling BROTLI for this node.",
+ rrdhost_hostname(s->host));
+ s->disabled_capabilities |= STREAM_CAP_BROTLI;
+ break;
}
+}
- size_t max_dst_size = LZ4_COMPRESSBOUND(size);
- size_t data_size = max_dst_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
+// ----------------------------------------------------------------------------
+// compressor public API
- if (!state->compression_result_buffer) {
- state->compression_result_buffer = mallocz(data_size);
- state->compression_result_buffer_size = data_size;
+void rrdpush_compressor_init(struct compressor_state *state) {
+ switch(state->algorithm) {
+#ifdef ENABLE_ZSTD
+ case COMPRESSION_ALGORITHM_ZSTD:
+ rrdpush_compressor_init_zstd(state);
+ break;
+#endif
+
+#ifdef ENABLE_LZ4
+ case COMPRESSION_ALGORITHM_LZ4:
+ rrdpush_compressor_init_lz4(state);
+ break;
+#endif
+
+#ifdef ENABLE_BROTLI
+ case COMPRESSION_ALGORITHM_BROTLI:
+ rrdpush_compressor_init_brotli(state);
+ break;
+#endif
+
+ default:
+ case COMPRESSION_ALGORITHM_GZIP:
+ rrdpush_compressor_init_gzip(state);
+ break;
}
- else if(unlikely(state->compression_result_buffer_size < data_size)) {
- state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size);
- state->compression_result_buffer_size = data_size;
+
+ simple_ring_buffer_reset(&state->input);
+ simple_ring_buffer_reset(&state->output);
+}
+
+void rrdpush_compressor_destroy(struct compressor_state *state) {
+ switch(state->algorithm) {
+#ifdef ENABLE_ZSTD
+ case COMPRESSION_ALGORITHM_ZSTD:
+ rrdpush_compressor_destroy_zstd(state);
+ break;
+#endif
+
+#ifdef ENABLE_LZ4
+ case COMPRESSION_ALGORITHM_LZ4:
+ rrdpush_compressor_destroy_lz4(state);
+ break;
+#endif
+
+#ifdef ENABLE_BROTLI
+ case COMPRESSION_ALGORITHM_BROTLI:
+ rrdpush_compressor_destroy_brotli(state);
+ break;
+#endif
+
+ default:
+ case COMPRESSION_ALGORITHM_GZIP:
+ rrdpush_compressor_destroy_gzip(state);
+ break;
}
- // the ring buffer always has space for LZ4_MAX_MSG_SIZE
- memcpy(state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos, data, size);
+ state->initialized = false;
- // this call needs the last 64K of our previous data
- // they are available in the ring buffer
- long int compressed_data_size = LZ4_compress_fast_continue(
- state->stream.lz4_stream,
- state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos,
- state->compression_result_buffer + RRDPUSH_COMPRESSION_SIGNATURE_SIZE,
- (int)size,
- (int)max_dst_size,
- 1);
+ simple_ring_buffer_destroy(&state->input);
+ simple_ring_buffer_destroy(&state->output);
+}
+
+size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out) {
+ size_t ret = 0;
+
+ switch(state->algorithm) {
+#ifdef ENABLE_ZSTD
+ case COMPRESSION_ALGORITHM_ZSTD:
+ ret = rrdpush_compress_zstd(state, data, size, out);
+ break;
+#endif
- if (compressed_data_size < 0) {
- netdata_log_error("Data compression error: %ld", compressed_data_size);
+#ifdef ENABLE_LZ4
+ case COMPRESSION_ALGORITHM_LZ4:
+ ret = rrdpush_compress_lz4(state, data, size, out);
+ break;
+#endif
+
+#ifdef ENABLE_BROTLI
+ case COMPRESSION_ALGORITHM_BROTLI:
+ ret = rrdpush_compress_brotli(state, data, size, out);
+ break;
+#endif
+
+ default:
+ case COMPRESSION_ALGORITHM_GZIP:
+ ret = rrdpush_compress_gzip(state, data, size, out);
+ break;
+ }
+
+ if(unlikely(ret >= COMPRESSION_MAX_CHUNK)) {
+ netdata_log_error("RRDPUSH_COMPRESS: compressed data is %zu bytes, which is >= than the max chunk size %d",
+ ret, COMPRESSION_MAX_CHUNK);
return 0;
}
- // update the next writing position of the ring buffer
- state->stream.input_ring_buffer_pos += size;
- if(unlikely(state->stream.input_ring_buffer_pos >= state->stream.input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE))
- state->stream.input_ring_buffer_pos = 0;
+ return ret;
+}
- // update the signature header
- uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
- *(uint32_t *)state->compression_result_buffer = len | RRDPUSH_COMPRESSION_SIGNATURE;
- *out = state->compression_result_buffer;
- netdata_log_debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
- return compressed_data_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
+// ----------------------------------------------------------------------------
+// decompressor public API
+
+void rrdpush_decompressor_destroy(struct decompressor_state *state) {
+ if(unlikely(!state->initialized))
+ return;
+
+ switch(state->algorithm) {
+#ifdef ENABLE_ZSTD
+ case COMPRESSION_ALGORITHM_ZSTD:
+ rrdpush_decompressor_destroy_zstd(state);
+ break;
+#endif
+
+#ifdef ENABLE_LZ4
+ case COMPRESSION_ALGORITHM_LZ4:
+ rrdpush_decompressor_destroy_lz4(state);
+ break;
+#endif
+
+#ifdef ENABLE_BROTLI
+ case COMPRESSION_ALGORITHM_BROTLI:
+ rrdpush_decompressor_destroy_brotli(state);
+ break;
+#endif
+
+ default:
+ case COMPRESSION_ALGORITHM_GZIP:
+ rrdpush_decompressor_destroy_gzip(state);
+ break;
+ }
+
+ simple_ring_buffer_destroy(&state->output);
+
+ state->initialized = false;
}
-/*
- * Decompress the compressed data in the internal buffer
- * Return the size of uncompressed data or 0 for error
- */
-size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
- if (unlikely(!state || !compressed_data || !compressed_size))
- return 0;
+void rrdpush_decompressor_init(struct decompressor_state *state) {
+ switch(state->algorithm) {
+#ifdef ENABLE_ZSTD
+ case COMPRESSION_ALGORITHM_ZSTD:
+ rrdpush_decompressor_init_zstd(state);
+ break;
+#endif
- if(unlikely(state->stream.read_at != state->stream.write_at))
- fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
+#ifdef ENABLE_LZ4
+ case COMPRESSION_ALGORITHM_LZ4:
+ rrdpush_decompressor_init_lz4(state);
+ break;
+#endif
+
+#ifdef ENABLE_BROTLI
+ case COMPRESSION_ALGORITHM_BROTLI:
+ rrdpush_decompressor_init_brotli(state);
+ break;
+#endif
- if (unlikely(state->stream.write_at >= state->stream.size / 2)) {
- state->stream.write_at = 0;
- state->stream.read_at = 0;
+ default:
+ case COMPRESSION_ALGORITHM_GZIP:
+ rrdpush_decompressor_init_gzip(state);
+ break;
}
- long int decompressed_size = LZ4_decompress_safe_continue(
- state->stream.lz4_stream
- , compressed_data
- , state->stream.buffer + state->stream.write_at
- , (int)compressed_size
- , (int)(state->stream.size - state->stream.write_at)
- );
+ state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
+ simple_ring_buffer_reset(&state->output);
+}
+
+size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
+ if (unlikely(state->output.read_pos != state->output.write_pos))
+ fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
+
+ size_t ret = 0;
+
+ switch(state->algorithm) {
+#ifdef ENABLE_ZSTD
+ case COMPRESSION_ALGORITHM_ZSTD:
+ ret = rrdpush_decompress_zstd(state, compressed_data, compressed_size);
+ break;
+#endif
+
+#ifdef ENABLE_LZ4
+ case COMPRESSION_ALGORITHM_LZ4:
+ ret = rrdpush_decompress_lz4(state, compressed_data, compressed_size);
+ break;
+#endif
+
+#ifdef ENABLE_BROTLI
+ case COMPRESSION_ALGORITHM_BROTLI:
+ ret = rrdpush_decompress_brotli(state, compressed_data, compressed_size);
+ break;
+#endif
+
+ default:
+ case COMPRESSION_ALGORITHM_GZIP:
+ ret = rrdpush_decompress_gzip(state, compressed_data, compressed_size);
+ break;
+ }
- if (unlikely(decompressed_size < 0)) {
- netdata_log_error("RRDPUSH DECOMPRESS: decompressor returned negative decompressed bytes: %ld", decompressed_size);
+ // for backwards compatibility we cannot check for COMPRESSION_MAX_MSG_SIZE,
+ // because old children may send this big payloads.
+ if(unlikely(ret > COMPRESSION_MAX_CHUNK)) {
+ netdata_log_error("RRDPUSH_DECOMPRESS: decompressed data is %zu bytes, which is bigger than the max msg size %d",
+ ret, COMPRESSION_MAX_CHUNK);
return 0;
}
- if(unlikely(decompressed_size + state->stream.write_at > state->stream.size))
- fatal("RRDPUSH DECOMPRESS: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, "
- "exceeding the buffer by %zu"
- , state->stream.size
- , state->stream.write_at
- , decompressed_size
- , (size_t)(state->stream.write_at + decompressed_size - state->stream.size)
- );
+ return ret;
+}
+
+// ----------------------------------------------------------------------------
+// unit test
+
+static inline long int my_random (void) {
+ return random();
+}
+
+void unittest_generate_random_name(char *dst, size_t size) {
+ if(size < 7)
+ size = 7;
- state->stream.write_at += decompressed_size;
+ size_t len = 5 + my_random() % (size - 6);
- // statistics
- state->total_compressed += compressed_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
- state->total_uncompressed += decompressed_size;
- state->packet_count++;
+ for(size_t i = 0; i < len ; i++) {
+ if(my_random() % 2 == 0)
+ dst[i] = 'A' + my_random() % 26;
+ else
+ dst[i] = 'a' + my_random() % 26;
+ }
- return decompressed_size;
+ dst[len] = '\0';
}
-void rrdpush_decompressor_reset(struct decompressor_state *state) {
- if(!state->initialized) {
- state->initialized = true;
- state->stream.lz4_stream = LZ4_createStreamDecode();
- state->stream.size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2;
- state->stream.buffer = mallocz(state->stream.size);
+void unittest_generate_message(BUFFER *wb, time_t now_s, size_t counter) {
+ bool with_slots = true;
+ NUMBER_ENCODING integer_encoding = NUMBER_ENCODING_BASE64;
+ NUMBER_ENCODING doubles_encoding = NUMBER_ENCODING_BASE64;
+ time_t update_every = 1;
+ time_t point_end_time_s = now_s;
+ time_t wall_clock_time_s = now_s;
+ size_t chart_slot = counter + 1;
+ size_t dimensions = 2 + my_random() % 5;
+ char chart[RRD_ID_LENGTH_MAX + 1] = "name";
+ unittest_generate_random_name(chart, 5 + my_random() % 30);
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, chart_slot);
}
- LZ4_setStreamDecode(state->stream.lz4_stream, NULL, 0);
+ buffer_fast_strcat(wb, " '", 2);
+ buffer_strcat(wb, chart);
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, update_every);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s);
+ buffer_fast_strcat(wb, " ", 1);
+ if(point_end_time_s == wall_clock_time_s)
+ buffer_fast_strcat(wb, "#", 1);
+ else
+ buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time_s);
+ buffer_fast_strcat(wb, "\n", 1);
+
+
+ for(size_t d = 0; d < dimensions ;d++) {
+ size_t dim_slot = d + 1;
+ char dim_id[RRD_ID_LENGTH_MAX + 1] = "dimension";
+ unittest_generate_random_name(dim_id, 10 + my_random() % 20);
+ int64_t last_collected_value = (my_random() % 2 == 0) ? (int64_t)(counter + d) : (int64_t)my_random();
+ NETDATA_DOUBLE value = (my_random() % 2 == 0) ? (NETDATA_DOUBLE)my_random() / ((NETDATA_DOUBLE)my_random() + 1) : (NETDATA_DOUBLE)last_collected_value;
+ SN_FLAGS flags = (my_random() % 1000 == 0) ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS;
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
+ buffer_strcat(wb, dim_id);
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_int64_encoded(wb, integer_encoding, last_collected_value);
+ buffer_fast_strcat(wb, " ", 1);
+
+ if((NETDATA_DOUBLE)last_collected_value == value)
+ buffer_fast_strcat(wb, "#", 1);
+ else
+ buffer_print_netdata_double_encoded(wb, doubles_encoding, value);
+
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_sn_flags(wb, flags, true);
+ buffer_fast_strcat(wb, "\n", 1);
+ }
- state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
- state->stream.write_at = 0;
- state->stream.read_at = 0;
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
}
-void rrdpush_decompressor_destroy(struct decompressor_state *state) {
- if(unlikely(!state->initialized))
- return;
+int unittest_rrdpush_compression_speed(compression_algorithm_t algorithm, const char *name) {
+ fprintf(stderr, "\nTesting streaming compression speed with %s\n", name);
+
+ struct compressor_state cctx = {
+ .initialized = false,
+ .algorithm = algorithm,
+ };
+ struct decompressor_state dctx = {
+ .initialized = false,
+ .algorithm = algorithm,
+ };
+
+ rrdpush_compressor_init(&cctx);
+ rrdpush_decompressor_init(&dctx);
+
+ int errors = 0;
+
+ BUFFER *wb = buffer_create(COMPRESSION_MAX_MSG_SIZE, NULL);
+ time_t now_s = now_realtime_sec();
+ usec_t compression_ut = 0;
+ usec_t decompression_ut = 0;
+ size_t bytes_compressed = 0;
+ size_t bytes_uncompressed = 0;
+
+ usec_t compression_started_ut = now_monotonic_usec();
+ usec_t decompression_started_ut = compression_started_ut;
+
+ for(int i = 0; i < 10000 ;i++) {
+ compression_started_ut = now_monotonic_usec();
+ decompression_ut += compression_started_ut - decompression_started_ut;
+
+ buffer_flush(wb);
+ while(buffer_strlen(wb) < COMPRESSION_MAX_MSG_SIZE - 1024)
+ unittest_generate_message(wb, now_s, i);
+
+ const char *txt = buffer_tostring(wb);
+ size_t txt_len = buffer_strlen(wb);
+ bytes_uncompressed += txt_len;
+
+ const char *out;
+ size_t size = rrdpush_compress(&cctx, txt, txt_len, &out);
+
+ bytes_compressed += size;
+ decompression_started_ut = now_monotonic_usec();
+ compression_ut += decompression_started_ut - compression_started_ut;
+
+ if(size == 0) {
+ fprintf(stderr, "iteration %d: compressed size %zu is zero\n",
+ i, size);
+ errors++;
+ goto cleanup;
+ }
+ else if(size >= COMPRESSION_MAX_CHUNK) {
+ fprintf(stderr, "iteration %d: compressed size %zu exceeds max allowed size\n",
+ i, size);
+ errors++;
+ goto cleanup;
+ }
+ else {
+ size_t dtxt_len = rrdpush_decompress(&dctx, out, size);
+ char *dtxt = (char *) &dctx.output.data[dctx.output.read_pos];
+
+ if(rrdpush_decompressed_bytes_in_buffer(&dctx) != dtxt_len) {
+ fprintf(stderr, "iteration %d: decompressed size %zu does not rrdpush_decompressed_bytes_in_buffer() %zu\n",
+ i, dtxt_len, rrdpush_decompressed_bytes_in_buffer(&dctx)
+ );
+ errors++;
+ goto cleanup;
+ }
+
+ if(!dtxt_len) {
+ fprintf(stderr, "iteration %d: decompressed size is zero\n", i);
+ errors++;
+ goto cleanup;
+ }
+ else if(dtxt_len != txt_len) {
+ fprintf(stderr, "iteration %d: decompressed size %zu does not match original size %zu\n",
+ i, dtxt_len, txt_len
+ );
+ errors++;
+ goto cleanup;
+ }
+ else {
+ if(memcmp(txt, dtxt, txt_len) != 0) {
+ fprintf(stderr, "iteration %d: decompressed data '%s' do not match original data length %zu\n",
+ i, dtxt, txt_len);
+ errors++;
+ goto cleanup;
+ }
+ }
+ }
+
+ // here we are supposed to copy the data and advance the position
+ dctx.output.read_pos += rrdpush_decompressed_bytes_in_buffer(&dctx);
+ }
+
+cleanup:
+ rrdpush_compressor_destroy(&cctx);
+ rrdpush_decompressor_destroy(&dctx);
+
+ if(errors)
+ fprintf(stderr, "Compression with %s: FAILED (%d errors)\n", name, errors);
+ else
+ fprintf(stderr, "Compression with %s: OK "
+ "(compression %zu usec, decompression %zu usec, bytes raw %zu, compressed %zu, savings ratio %0.2f%%)\n",
+ name, compression_ut, decompression_ut,
+ bytes_uncompressed, bytes_compressed,
+ 100.0 - (double)bytes_compressed * 100.0 / (double)bytes_uncompressed);
- if (state->stream.lz4_stream) {
- LZ4_freeStreamDecode(state->stream.lz4_stream);
- state->stream.lz4_stream = NULL;
+ return errors;
+}
+
+int unittest_rrdpush_compression(compression_algorithm_t algorithm, const char *name) {
+ fprintf(stderr, "\nTesting streaming compression with %s\n", name);
+
+ struct compressor_state cctx = {
+ .initialized = false,
+ .algorithm = algorithm,
+ };
+ struct decompressor_state dctx = {
+ .initialized = false,
+ .algorithm = algorithm,
+ };
+
+ char txt[COMPRESSION_MAX_MSG_SIZE];
+
+ rrdpush_compressor_init(&cctx);
+ rrdpush_decompressor_init(&dctx);
+
+ int errors = 0;
+
+ memset(txt, '=', COMPRESSION_MAX_MSG_SIZE);
+
+ for(int i = 0; i < COMPRESSION_MAX_MSG_SIZE ;i++) {
+ txt[i] = 'A' + (i % 26);
+ size_t txt_len = i + 1;
+
+ const char *out;
+ size_t size = rrdpush_compress(&cctx, txt, txt_len, &out);
+
+ if(size == 0) {
+ fprintf(stderr, "iteration %d: compressed size %zu is zero\n",
+ i, size);
+ errors++;
+ goto cleanup;
+ }
+ else if(size >= COMPRESSION_MAX_CHUNK) {
+ fprintf(stderr, "iteration %d: compressed size %zu exceeds max allowed size\n",
+ i, size);
+ errors++;
+ goto cleanup;
+ }
+ else {
+ size_t dtxt_len = rrdpush_decompress(&dctx, out, size);
+ char *dtxt = (char *) &dctx.output.data[dctx.output.read_pos];
+
+ if(rrdpush_decompressed_bytes_in_buffer(&dctx) != dtxt_len) {
+ fprintf(stderr, "iteration %d: decompressed size %zu does not rrdpush_decompressed_bytes_in_buffer() %zu\n",
+ i, dtxt_len, rrdpush_decompressed_bytes_in_buffer(&dctx)
+ );
+ errors++;
+ goto cleanup;
+ }
+
+ if(!dtxt_len) {
+ fprintf(stderr, "iteration %d: decompressed size is zero\n", i);
+ errors++;
+ goto cleanup;
+ }
+ else if(dtxt_len != txt_len) {
+ fprintf(stderr, "iteration %d: decompressed size %zu does not match original size %zu\n",
+ i, dtxt_len, txt_len
+ );
+ errors++;
+ goto cleanup;
+ }
+ else {
+ if(memcmp(txt, dtxt, txt_len) != 0) {
+ txt[txt_len] = '\0';
+ dtxt[txt_len + 5] = '\0';
+
+ fprintf(stderr, "iteration %d: decompressed data '%s' do not match original data '%s' of length %zu\n",
+ i, dtxt, txt, txt_len);
+ errors++;
+ goto cleanup;
+ }
+ }
+ }
+
+ // fill the compressed buffer with garbage
+ memset((void *)out, 'x', size);
+
+ // here we are supposed to copy the data and advance the position
+ dctx.output.read_pos += rrdpush_decompressed_bytes_in_buffer(&dctx);
}
- freez(state->stream.buffer);
- state->stream.buffer = NULL;
+cleanup:
+ rrdpush_compressor_destroy(&cctx);
+ rrdpush_decompressor_destroy(&dctx);
- state->initialized = false;
+ if(errors)
+ fprintf(stderr, "Compression with %s: FAILED (%d errors)\n", name, errors);
+ else
+ fprintf(stderr, "Compression with %s: OK\n", name);
+
+ return errors;
}
-#endif
+int unittest_rrdpush_compressions(void) {
+ int ret = 0;
+
+ ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_ZSTD, "ZSTD");
+ ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_LZ4, "LZ4");
+ ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_BROTLI, "BROTLI");
+ ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_GZIP, "GZIP");
+
+ ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_ZSTD, "ZSTD");
+ ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_LZ4, "LZ4");
+ ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_BROTLI, "BROTLI");
+ ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_GZIP, "GZIP");
+
+ return ret;
+}
diff --git a/streaming/compression.h b/streaming/compression.h
new file mode 100644
index 000000000..a67f65b83
--- /dev/null
+++ b/streaming/compression.h
@@ -0,0 +1,175 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdpush.h"
+
+#ifndef NETDATA_RRDPUSH_COMPRESSION_H
+#define NETDATA_RRDPUSH_COMPRESSION_H 1
+
+// signature MUST end with a newline
+
+#if COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)
+#error "COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)"
+#endif
+
+typedef uint32_t rrdpush_signature_t;
+#define RRDPUSH_COMPRESSION_SIGNATURE ((rrdpush_signature_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
+#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((rrdpush_signature_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
+#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE sizeof(rrdpush_signature_t)
+
+static inline rrdpush_signature_t rrdpush_compress_encode_signature(size_t compressed_data_size) {
+ rrdpush_signature_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
+ return len | RRDPUSH_COMPRESSION_SIGNATURE;
+}
+
+typedef enum {
+ COMPRESSION_ALGORITHM_NONE = 0,
+ COMPRESSION_ALGORITHM_ZSTD,
+ COMPRESSION_ALGORITHM_LZ4,
+ COMPRESSION_ALGORITHM_GZIP,
+ COMPRESSION_ALGORITHM_BROTLI,
+
+ // terminator
+ COMPRESSION_ALGORITHM_MAX,
+} compression_algorithm_t;
+
+extern int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX];
+
+// this defines the order the algorithms will be selected by the receiver (parent)
+#define RRDPUSH_COMPRESSION_ALGORITHMS_ORDER "zstd lz4 brotli gzip"
+
+// ----------------------------------------------------------------------------
+
+typedef struct simple_ring_buffer {
+ const char *data;
+ size_t size;
+ size_t read_pos;
+ size_t write_pos;
+} SIMPLE_RING_BUFFER;
+
+static inline void simple_ring_buffer_reset(SIMPLE_RING_BUFFER *b) {
+ b->read_pos = b->write_pos = 0;
+}
+
+static inline void simple_ring_buffer_make_room(SIMPLE_RING_BUFFER *b, size_t size) {
+ if(b->write_pos + size > b->size) {
+ if(!b->size)
+ b->size = COMPRESSION_MAX_CHUNK;
+ else
+ b->size *= 2;
+
+ if(b->write_pos + size > b->size)
+ b->size += size;
+
+ b->data = (const char *)reallocz((void *)b->data, b->size);
+ }
+}
+
+static inline void simple_ring_buffer_append_data(SIMPLE_RING_BUFFER *b, const void *data, size_t size) {
+ simple_ring_buffer_make_room(b, size);
+ memcpy((void *)(b->data + b->write_pos), data, size);
+ b->write_pos += size;
+}
+
+static inline void simple_ring_buffer_destroy(SIMPLE_RING_BUFFER *b) {
+ freez((void *)b->data);
+ b->data = NULL;
+ b->read_pos = b->write_pos = b->size = 0;
+}
+
+// ----------------------------------------------------------------------------
+
+struct compressor_state {
+ bool initialized;
+ compression_algorithm_t algorithm;
+
+ SIMPLE_RING_BUFFER input;
+ SIMPLE_RING_BUFFER output;
+
+ int level;
+ void *stream;
+
+ struct {
+ size_t total_compressed;
+ size_t total_uncompressed;
+ size_t total_compressions;
+ } sender_locked;
+};
+
+void rrdpush_compressor_init(struct compressor_state *state);
+void rrdpush_compressor_destroy(struct compressor_state *state);
+size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out);
+
+// ----------------------------------------------------------------------------
+
+struct decompressor_state {
+ bool initialized;
+ compression_algorithm_t algorithm;
+ size_t signature_size;
+
+ size_t total_compressed;
+ size_t total_uncompressed;
+ size_t total_compressions;
+
+ SIMPLE_RING_BUFFER output;
+
+ void *stream;
+};
+
+void rrdpush_decompressor_destroy(struct decompressor_state *state);
+void rrdpush_decompressor_init(struct decompressor_state *state);
+size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
+
+static inline size_t rrdpush_decompress_decode_signature(const char *data, size_t data_size) {
+ if (unlikely(!data || !data_size))
+ return 0;
+
+ if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
+ return 0;
+
+ rrdpush_signature_t sign = *(rrdpush_signature_t *)data;
+ if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
+ return 0;
+
+ size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
+ return length;
+}
+
+static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
+ if(unlikely(state->output.read_pos != state->output.write_pos))
+ fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
+
+ return rrdpush_decompress_decode_signature(header, header_size);
+}
+
+static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
+ if(unlikely(state->output.read_pos > state->output.write_pos))
+ fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
+
+ return state->output.write_pos - state->output.read_pos;
+}
+
+static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
+ if (unlikely(!state || !size || !dst))
+ return 0;
+
+ size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
+
+ if(unlikely(!remaining))
+ return 0;
+
+ size_t bytes_to_return = size;
+ if(bytes_to_return > remaining)
+ bytes_to_return = remaining;
+
+ memcpy(dst, state->output.data + state->output.read_pos, bytes_to_return);
+ state->output.read_pos += bytes_to_return;
+
+ if(unlikely(state->output.read_pos > state->output.write_pos))
+ fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
+
+ return bytes_to_return;
+}
+
+// ----------------------------------------------------------------------------
+
+#endif // NETDATA_RRDPUSH_COMPRESSION_H 1
diff --git a/streaming/compression_brotli.c b/streaming/compression_brotli.c
new file mode 100644
index 000000000..cf52f3bca
--- /dev/null
+++ b/streaming/compression_brotli.c
@@ -0,0 +1,142 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "compression_brotli.h"
+
+#ifdef ENABLE_BROTLI
+#include <brotli/encode.h>
+#include <brotli/decode.h>
+
+void rrdpush_compressor_init_brotli(struct compressor_state *state) {
+ if (!state->initialized) {
+ state->initialized = true;
+ state->stream = BrotliEncoderCreateInstance(NULL, NULL, NULL);
+
+ if (state->level < BROTLI_MIN_QUALITY) {
+ state->level = BROTLI_MIN_QUALITY;
+ } else if (state->level > BROTLI_MAX_QUALITY) {
+ state->level = BROTLI_MAX_QUALITY;
+ }
+
+ BrotliEncoderSetParameter(state->stream, BROTLI_PARAM_QUALITY, state->level);
+ }
+}
+
+void rrdpush_compressor_destroy_brotli(struct compressor_state *state) {
+ if (state->stream) {
+ BrotliEncoderDestroyInstance(state->stream);
+ state->stream = NULL;
+ }
+}
+
+size_t rrdpush_compress_brotli(struct compressor_state *state, const char *data, size_t size, const char **out) {
+ if (unlikely(!state || !size || !out))
+ return 0;
+
+ simple_ring_buffer_make_room(&state->output, MAX(BrotliEncoderMaxCompressedSize(size), COMPRESSION_MAX_CHUNK));
+
+ size_t available_out = state->output.size;
+
+ size_t available_in = size;
+ const uint8_t *next_in = (const uint8_t *)data;
+ uint8_t *next_out = (uint8_t *)state->output.data;
+
+ if (!BrotliEncoderCompressStream(state->stream, BROTLI_OPERATION_FLUSH, &available_in, &next_in, &available_out, &next_out, NULL)) {
+ netdata_log_error("STREAM: Brotli compression failed.");
+ return 0;
+ }
+
+ if(available_in != 0) {
+ netdata_log_error("STREAM: BrotliEncoderCompressStream() did not use all the input buffer, %zu bytes out of %zu remain",
+ available_in, size);
+ return 0;
+ }
+
+ size_t compressed_size = state->output.size - available_out;
+ if(available_out == 0) {
+ netdata_log_error("STREAM: BrotliEncoderCompressStream() needs a bigger output buffer than the one we provided "
+ "(output buffer %zu bytes, compressed payload %zu bytes)",
+ state->output.size, size);
+ return 0;
+ }
+
+ if(compressed_size == 0) {
+ netdata_log_error("STREAM: BrotliEncoderCompressStream() did not produce any output from the input provided "
+ "(input buffer %zu bytes)",
+ size);
+ return 0;
+ }
+
+ state->sender_locked.total_compressions++;
+ state->sender_locked.total_uncompressed += size - available_in;
+ state->sender_locked.total_compressed += compressed_size;
+
+ *out = state->output.data;
+ return compressed_size;
+}
+
+void rrdpush_decompressor_init_brotli(struct decompressor_state *state) {
+ if (!state->initialized) {
+ state->initialized = true;
+ state->stream = BrotliDecoderCreateInstance(NULL, NULL, NULL);
+
+ simple_ring_buffer_make_room(&state->output, COMPRESSION_MAX_CHUNK);
+ }
+}
+
+void rrdpush_decompressor_destroy_brotli(struct decompressor_state *state) {
+ if (state->stream) {
+ BrotliDecoderDestroyInstance(state->stream);
+ state->stream = NULL;
+ }
+}
+
+size_t rrdpush_decompress_brotli(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
+ if (unlikely(!state || !compressed_data || !compressed_size))
+ return 0;
+
+ // The state.output ring buffer is always EMPTY at this point,
+ // meaning that (state->output.read_pos == state->output.write_pos)
+ // However, THEY ARE NOT ZERO.
+
+ size_t available_out = state->output.size;
+ size_t available_in = compressed_size;
+ const uint8_t *next_in = (const uint8_t *)compressed_data;
+ uint8_t *next_out = (uint8_t *)state->output.data;
+
+ if (BrotliDecoderDecompressStream(state->stream, &available_in, &next_in, &available_out, &next_out, NULL) == BROTLI_DECODER_RESULT_ERROR) {
+ netdata_log_error("STREAM: Brotli decompression failed.");
+ return 0;
+ }
+
+ if(available_in != 0) {
+ netdata_log_error("STREAM: BrotliDecoderDecompressStream() did not use all the input buffer, %zu bytes out of %zu remain",
+ available_in, compressed_size);
+ return 0;
+ }
+
+ size_t decompressed_size = state->output.size - available_out;
+ if(available_out == 0) {
+ netdata_log_error("STREAM: BrotliDecoderDecompressStream() needs a bigger output buffer than the one we provided "
+ "(output buffer %zu bytes, compressed payload %zu bytes)",
+ state->output.size, compressed_size);
+ return 0;
+ }
+
+ if(decompressed_size == 0) {
+ netdata_log_error("STREAM: BrotliDecoderDecompressStream() did not produce any output from the input provided "
+ "(input buffer %zu bytes)",
+ compressed_size);
+ return 0;
+ }
+
+ state->output.read_pos = 0;
+ state->output.write_pos = decompressed_size;
+
+ state->total_compressed += compressed_size - available_in;
+ state->total_uncompressed += decompressed_size;
+ state->total_compressions++;
+
+ return decompressed_size;
+}
+
+#endif // ENABLE_BROTLI
diff --git a/streaming/compression_brotli.h b/streaming/compression_brotli.h
new file mode 100644
index 000000000..4955e5a82
--- /dev/null
+++ b/streaming/compression_brotli.h
@@ -0,0 +1,15 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "compression.h"
+
+#ifndef NETDATA_STREAMING_COMPRESSION_BROTLI_H
+#define NETDATA_STREAMING_COMPRESSION_BROTLI_H
+
+void rrdpush_compressor_init_brotli(struct compressor_state *state);
+void rrdpush_compressor_destroy_brotli(struct compressor_state *state);
+size_t rrdpush_compress_brotli(struct compressor_state *state, const char *data, size_t size, const char **out);
+size_t rrdpush_decompress_brotli(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
+void rrdpush_decompressor_init_brotli(struct decompressor_state *state);
+void rrdpush_decompressor_destroy_brotli(struct decompressor_state *state);
+
+#endif //NETDATA_STREAMING_COMPRESSION_BROTLI_H
diff --git a/streaming/compression_gzip.c b/streaming/compression_gzip.c
new file mode 100644
index 000000000..c4ef3af05
--- /dev/null
+++ b/streaming/compression_gzip.c
@@ -0,0 +1,164 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "compression_gzip.h"
+#include <zlib.h>
+
+void rrdpush_compressor_init_gzip(struct compressor_state *state) {
+ if (!state->initialized) {
+ state->initialized = true;
+
+ // Initialize deflate stream
+ z_stream *strm = state->stream = (z_stream *) mallocz(sizeof(z_stream));
+ strm->zalloc = Z_NULL;
+ strm->zfree = Z_NULL;
+ strm->opaque = Z_NULL;
+
+ if(state->level < Z_BEST_SPEED)
+ state->level = Z_BEST_SPEED;
+
+ if(state->level > Z_BEST_COMPRESSION)
+ state->level = Z_BEST_COMPRESSION;
+
+ // int r = deflateInit2(strm, Z_BEST_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
+ int r = deflateInit2(strm, state->level, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
+ if (r != Z_OK) {
+ netdata_log_error("Failed to initialize deflate with error: %d", r);
+ freez(state->stream);
+ state->initialized = false;
+ return;
+ }
+
+ }
+}
+
+void rrdpush_compressor_destroy_gzip(struct compressor_state *state) {
+ if (state->stream) {
+ deflateEnd(state->stream);
+ freez(state->stream);
+ state->stream = NULL;
+ }
+}
+
+size_t rrdpush_compress_gzip(struct compressor_state *state, const char *data, size_t size, const char **out) {
+ if (unlikely(!state || !size || !out))
+ return 0;
+
+ simple_ring_buffer_make_room(&state->output, deflateBound(state->stream, size));
+
+ z_stream *strm = state->stream;
+ strm->avail_in = (uInt)size;
+ strm->next_in = (Bytef *)data;
+ strm->avail_out = (uInt)state->output.size;
+ strm->next_out = (Bytef *)state->output.data;
+
+ int ret = deflate(strm, Z_SYNC_FLUSH);
+ if (ret != Z_OK && ret != Z_STREAM_END) {
+ netdata_log_error("STREAM: deflate() failed with error %d", ret);
+ return 0;
+ }
+
+ if(strm->avail_in != 0) {
+ netdata_log_error("STREAM: deflate() did not use all the input buffer, %u bytes out of %zu remain",
+ strm->avail_in, size);
+ return 0;
+ }
+
+ if(strm->avail_out == 0) {
+ netdata_log_error("STREAM: deflate() needs a bigger output buffer than the one we provided "
+ "(output buffer %zu bytes, compressed payload %zu bytes)",
+ state->output.size, size);
+ return 0;
+ }
+
+ size_t compressed_data_size = state->output.size - strm->avail_out;
+
+ if(compressed_data_size == 0) {
+ netdata_log_error("STREAM: deflate() did not produce any output "
+ "(output buffer %zu bytes, compressed payload %zu bytes)",
+ state->output.size, size);
+ return 0;
+ }
+
+ state->sender_locked.total_compressions++;
+ state->sender_locked.total_uncompressed += size;
+ state->sender_locked.total_compressed += compressed_data_size;
+
+ *out = state->output.data;
+ return compressed_data_size;
+}
+
+void rrdpush_decompressor_init_gzip(struct decompressor_state *state) {
+ if (!state->initialized) {
+ state->initialized = true;
+
+ // Initialize inflate stream
+ z_stream *strm = state->stream = (z_stream *)mallocz(sizeof(z_stream));
+ strm->zalloc = Z_NULL;
+ strm->zfree = Z_NULL;
+ strm->opaque = Z_NULL;
+
+ int r = inflateInit2(strm, 15 + 16);
+ if (r != Z_OK) {
+ netdata_log_error("Failed to initialize inflateInit2() with error: %d", r);
+ freez(state->stream);
+ state->initialized = false;
+ return;
+ }
+
+ simple_ring_buffer_make_room(&state->output, COMPRESSION_MAX_CHUNK);
+ }
+}
+
+void rrdpush_decompressor_destroy_gzip(struct decompressor_state *state) {
+ if (state->stream) {
+ inflateEnd(state->stream);
+ freez(state->stream);
+ state->stream = NULL;
+ }
+}
+
+size_t rrdpush_decompress_gzip(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
+ if (unlikely(!state || !compressed_data || !compressed_size))
+ return 0;
+
+ // The state.output ring buffer is always EMPTY at this point,
+ // meaning that (state->output.read_pos == state->output.write_pos)
+ // However, THEY ARE NOT ZERO.
+
+ z_stream *strm = state->stream;
+ strm->avail_in = (uInt)compressed_size;
+ strm->next_in = (Bytef *)compressed_data;
+ strm->avail_out = (uInt)state->output.size;
+ strm->next_out = (Bytef *)state->output.data;
+
+ int ret = inflate(strm, Z_SYNC_FLUSH);
+ if (ret != Z_STREAM_END && ret != Z_OK) {
+ netdata_log_error("RRDPUSH DECOMPRESS: inflate() failed with error %d", ret);
+ return 0;
+ }
+
+ if(strm->avail_in != 0) {
+ netdata_log_error("RRDPUSH DECOMPRESS: inflate() did not use all compressed data we provided "
+ "(compressed payload %zu bytes, remaining to be uncompressed %u)"
+ , compressed_size, strm->avail_in);
+ return 0;
+ }
+
+ if(strm->avail_out == 0) {
+ netdata_log_error("RRDPUSH DECOMPRESS: inflate() needs a bigger output buffer than the one we provided "
+ "(compressed payload %zu bytes, output buffer size %zu bytes)"
+ , compressed_size, state->output.size);
+ return 0;
+ }
+
+ size_t decompressed_size = state->output.size - strm->avail_out;
+
+ state->output.read_pos = 0;
+ state->output.write_pos = decompressed_size;
+
+ state->total_compressed += compressed_size;
+ state->total_uncompressed += decompressed_size;
+ state->total_compressions++;
+
+ return decompressed_size;
+}
diff --git a/streaming/compression_gzip.h b/streaming/compression_gzip.h
new file mode 100644
index 000000000..85f34bc6d
--- /dev/null
+++ b/streaming/compression_gzip.h
@@ -0,0 +1,15 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "compression.h"
+
+#ifndef NETDATA_STREAMING_COMPRESSION_GZIP_H
+#define NETDATA_STREAMING_COMPRESSION_GZIP_H
+
+void rrdpush_compressor_init_gzip(struct compressor_state *state);
+void rrdpush_compressor_destroy_gzip(struct compressor_state *state);
+size_t rrdpush_compress_gzip(struct compressor_state *state, const char *data, size_t size, const char **out);
+size_t rrdpush_decompress_gzip(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
+void rrdpush_decompressor_init_gzip(struct decompressor_state *state);
+void rrdpush_decompressor_destroy_gzip(struct decompressor_state *state);
+
+#endif //NETDATA_STREAMING_COMPRESSION_GZIP_H
diff --git a/streaming/compression_lz4.c b/streaming/compression_lz4.c
new file mode 100644
index 000000000..f5174134e
--- /dev/null
+++ b/streaming/compression_lz4.c
@@ -0,0 +1,143 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "compression_lz4.h"
+
+#ifdef ENABLE_LZ4
+#include "lz4.h"
+
+// ----------------------------------------------------------------------------
+// compress
+
+void rrdpush_compressor_init_lz4(struct compressor_state *state) {
+ if(!state->initialized) {
+ state->initialized = true;
+ state->stream = LZ4_createStream();
+
+ // LZ4 needs access to the last 64KB of source data
+ // so, we keep twice the size of each message
+ simple_ring_buffer_make_room(&state->input, 65536 + COMPRESSION_MAX_CHUNK * 2);
+ }
+}
+
+void rrdpush_compressor_destroy_lz4(struct compressor_state *state) {
+ if (state->stream) {
+ LZ4_freeStream(state->stream);
+ state->stream = NULL;
+ }
+}
+
+/*
+ * Compress the given block of data
+ * Compressed data will remain in the internal buffer until the next invocation
+ * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
+ * or 0 in case of error
+ */
+size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out) {
+ if(unlikely(!state || !size || !out))
+ return 0;
+
+ // we need to keep the last 64K of our previous source data
+ // as they were in the ring buffer
+
+ simple_ring_buffer_make_room(&state->output, LZ4_COMPRESSBOUND(size));
+
+ if(state->input.write_pos + size > state->input.size)
+ // the input buffer cannot fit out data, restart from zero
+ simple_ring_buffer_reset(&state->input);
+
+ simple_ring_buffer_append_data(&state->input, data, size);
+
+ long int compressed_data_size = LZ4_compress_fast_continue(
+ state->stream,
+ state->input.data + state->input.read_pos,
+ (char *)state->output.data,
+ (int)(state->input.write_pos - state->input.read_pos),
+ (int)state->output.size,
+ state->level);
+
+ if (compressed_data_size <= 0) {
+ netdata_log_error("STREAM: LZ4_compress_fast_continue() returned %ld "
+ "(source is %zu bytes, output buffer can fit %zu bytes)",
+ compressed_data_size, size, state->output.size);
+ return 0;
+ }
+
+ state->input.read_pos = state->input.write_pos;
+
+ state->sender_locked.total_compressions++;
+ state->sender_locked.total_uncompressed += size;
+ state->sender_locked.total_compressed += compressed_data_size;
+
+ *out = state->output.data;
+ return compressed_data_size;
+}
+
+// ----------------------------------------------------------------------------
+// decompress
+
+void rrdpush_decompressor_init_lz4(struct decompressor_state *state) {
+ if(!state->initialized) {
+ state->initialized = true;
+ state->stream = LZ4_createStreamDecode();
+ simple_ring_buffer_make_room(&state->output, 65536 + COMPRESSION_MAX_CHUNK * 2);
+ }
+}
+
+void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state) {
+ if (state->stream) {
+ LZ4_freeStreamDecode(state->stream);
+ state->stream = NULL;
+ }
+}
+
+/*
+ * Decompress the compressed data in the internal buffer
+ * Return the size of uncompressed data or 0 for error
+ */
+size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
+ if (unlikely(!state || !compressed_data || !compressed_size))
+ return 0;
+
+ // The state.output ring buffer is always EMPTY at this point,
+ // meaning that (state->output.read_pos == state->output.write_pos)
+ // However, THEY ARE NOT ZERO.
+
+ if (unlikely(state->output.write_pos + COMPRESSION_MAX_CHUNK > state->output.size))
+ // the input buffer cannot fit out data, restart from zero
+ simple_ring_buffer_reset(&state->output);
+
+ long int decompressed_size = LZ4_decompress_safe_continue(
+ state->stream
+ , compressed_data
+ , (char *)(state->output.data + state->output.write_pos)
+ , (int)compressed_size
+ , (int)(state->output.size - state->output.write_pos)
+ );
+
+ if (unlikely(decompressed_size < 0)) {
+ netdata_log_error("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() returned negative value: %ld "
+ "(compressed chunk is %zu bytes)"
+ , decompressed_size, compressed_size);
+ return 0;
+ }
+
+ if(unlikely(decompressed_size + state->output.write_pos > state->output.size))
+ fatal("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() overflown the stream_buffer "
+ "(size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu)"
+ , state->output.size
+ , state->output.write_pos
+ , decompressed_size
+ , (size_t)(state->output.write_pos + decompressed_size - state->output.size)
+ );
+
+ state->output.write_pos += decompressed_size;
+
+ // statistics
+ state->total_compressed += compressed_size;
+ state->total_uncompressed += decompressed_size;
+ state->total_compressions++;
+
+ return decompressed_size;
+}
+
+#endif // ENABLE_LZ4
diff --git a/streaming/compression_lz4.h b/streaming/compression_lz4.h
new file mode 100644
index 000000000..69f0fadcc
--- /dev/null
+++ b/streaming/compression_lz4.h
@@ -0,0 +1,19 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "compression.h"
+
+#ifndef NETDATA_STREAMING_COMPRESSION_LZ4_H
+#define NETDATA_STREAMING_COMPRESSION_LZ4_H
+
+#ifdef ENABLE_LZ4
+
+void rrdpush_compressor_init_lz4(struct compressor_state *state);
+void rrdpush_compressor_destroy_lz4(struct compressor_state *state);
+size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out);
+size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
+void rrdpush_decompressor_init_lz4(struct decompressor_state *state);
+void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state);
+
+#endif // ENABLE_LZ4
+
+#endif //NETDATA_STREAMING_COMPRESSION_LZ4_H
diff --git a/streaming/compression_zstd.c b/streaming/compression_zstd.c
new file mode 100644
index 000000000..dabc044f7
--- /dev/null
+++ b/streaming/compression_zstd.c
@@ -0,0 +1,163 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "compression_zstd.h"
+
+#ifdef ENABLE_ZSTD
+#include <zstd.h>
+
+void rrdpush_compressor_init_zstd(struct compressor_state *state) {
+ if(!state->initialized) {
+ state->initialized = true;
+ state->stream = ZSTD_createCStream();
+
+ if(state->level < 1)
+ state->level = 1;
+
+ if(state->level > ZSTD_maxCLevel())
+ state->level = ZSTD_maxCLevel();
+
+ size_t ret = ZSTD_initCStream(state->stream, state->level);
+ if(ZSTD_isError(ret))
+ netdata_log_error("STREAM: ZSTD_initCStream() returned error: %s", ZSTD_getErrorName(ret));
+
+ // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_compressionLevel, 1);
+ // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_strategy, ZSTD_fast);
+ }
+}
+
+void rrdpush_compressor_destroy_zstd(struct compressor_state *state) {
+ if(state->stream) {
+ ZSTD_freeCStream(state->stream);
+ state->stream = NULL;
+ }
+}
+
+size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out) {
+ if(unlikely(!state || !size || !out))
+ return 0;
+
+ ZSTD_inBuffer inBuffer = {
+ .pos = 0,
+ .size = size,
+ .src = data,
+ };
+
+ size_t wanted_size = MAX(ZSTD_compressBound(inBuffer.size - inBuffer.pos), ZSTD_CStreamOutSize());
+ simple_ring_buffer_make_room(&state->output, wanted_size);
+
+ ZSTD_outBuffer outBuffer = {
+ .pos = 0,
+ .size = state->output.size,
+ .dst = (void *)state->output.data,
+ };
+
+ // compress
+ size_t ret = ZSTD_compressStream(state->stream, &outBuffer, &inBuffer);
+
+ // error handling
+ if(ZSTD_isError(ret)) {
+ netdata_log_error("STREAM: ZSTD_compressStream() return error: %s", ZSTD_getErrorName(ret));
+ return 0;
+ }
+
+ if(inBuffer.pos < inBuffer.size) {
+ netdata_log_error("STREAM: ZSTD_compressStream() left unprocessed input (source payload %zu bytes, consumed %zu bytes)",
+ inBuffer.size, inBuffer.pos);
+ return 0;
+ }
+
+ if(outBuffer.pos == 0) {
+ // ZSTD needs more input to flush the output, so let's flush it manually
+ ret = ZSTD_flushStream(state->stream, &outBuffer);
+
+ if(ZSTD_isError(ret)) {
+ netdata_log_error("STREAM: ZSTD_flushStream() return error: %s", ZSTD_getErrorName(ret));
+ return 0;
+ }
+
+ if(outBuffer.pos == 0) {
+ netdata_log_error("STREAM: ZSTD_compressStream() returned zero compressed bytes "
+ "(source is %zu bytes, output buffer can fit %zu bytes) "
+ , size, outBuffer.size);
+ return 0;
+ }
+ }
+
+ state->sender_locked.total_compressions++;
+ state->sender_locked.total_uncompressed += size;
+ state->sender_locked.total_compressed += outBuffer.pos;
+
+ // return values
+ *out = state->output.data;
+ return outBuffer.pos;
+}
+
+void rrdpush_decompressor_init_zstd(struct decompressor_state *state) {
+ if(!state->initialized) {
+ state->initialized = true;
+ state->stream = ZSTD_createDStream();
+
+ size_t ret = ZSTD_initDStream(state->stream);
+ if(ZSTD_isError(ret))
+ netdata_log_error("STREAM: ZSTD_initDStream() returned error: %s", ZSTD_getErrorName(ret));
+
+ simple_ring_buffer_make_room(&state->output, MAX(COMPRESSION_MAX_CHUNK, ZSTD_DStreamOutSize()));
+ }
+}
+
+void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state) {
+ if (state->stream) {
+ ZSTD_freeDStream(state->stream);
+ state->stream = NULL;
+ }
+}
+
+size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
+ if (unlikely(!state || !compressed_data || !compressed_size))
+ return 0;
+
+ // The state.output ring buffer is always EMPTY at this point,
+ // meaning that (state->output.read_pos == state->output.write_pos)
+ // However, THEY ARE NOT ZERO.
+
+ ZSTD_inBuffer inBuffer = {
+ .pos = 0,
+ .size = compressed_size,
+ .src = compressed_data,
+ };
+
+ ZSTD_outBuffer outBuffer = {
+ .pos = 0,
+ .dst = (char *)state->output.data,
+ .size = state->output.size,
+ };
+
+ size_t ret = ZSTD_decompressStream(
+ state->stream
+ , &outBuffer
+ , &inBuffer);
+
+ if(ZSTD_isError(ret)) {
+ netdata_log_error("STREAM: ZSTD_decompressStream() return error: %s", ZSTD_getErrorName(ret));
+ return 0;
+ }
+
+ if(inBuffer.pos < inBuffer.size)
+ fatal("RRDPUSH DECOMPRESS: ZSTD ZSTD_decompressStream() decompressed %zu bytes, "
+ "but %zu bytes of compressed data remain",
+ inBuffer.pos, inBuffer.size);
+
+ size_t decompressed_size = outBuffer.pos;
+
+ state->output.read_pos = 0;
+ state->output.write_pos = outBuffer.pos;
+
+ // statistics
+ state->total_compressed += compressed_size;
+ state->total_uncompressed += decompressed_size;
+ state->total_compressions++;
+
+ return decompressed_size;
+}
+
+#endif // ENABLE_ZSTD
diff --git a/streaming/compression_zstd.h b/streaming/compression_zstd.h
new file mode 100644
index 000000000..bfabbf89d
--- /dev/null
+++ b/streaming/compression_zstd.h
@@ -0,0 +1,19 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "compression.h"
+
+#ifndef NETDATA_STREAMING_COMPRESSION_ZSTD_H
+#define NETDATA_STREAMING_COMPRESSION_ZSTD_H
+
+#ifdef ENABLE_ZSTD
+
+void rrdpush_compressor_init_zstd(struct compressor_state *state);
+void rrdpush_compressor_destroy_zstd(struct compressor_state *state);
+size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out);
+size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
+void rrdpush_decompressor_init_zstd(struct decompressor_state *state);
+void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state);
+
+#endif // ENABLE_ZSTD
+
+#endif //NETDATA_STREAMING_COMPRESSION_ZSTD_H
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 10ef8b7d3..a12b94fb4 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
+#include "web/server/h2o/http_server.h"
extern struct config stream_config;
@@ -28,9 +29,7 @@ void receiver_state_free(struct receiver_state *rpt) {
close(rpt->fd);
}
-#ifdef ENABLE_RRDPUSH_COMPRESSION
rrdpush_decompressor_destroy(&rpt->decompressor);
-#endif
if(rpt->system_info)
rrdhost_system_info_free(rpt->system_info);
@@ -59,6 +58,11 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz
return 0;
}
+#ifdef ENABLE_H2O
+ if (is_h2o_rrdpush(r))
+ return (int)h2o_stream_read(r->h2o_ctx, buffer, size);
+#endif
+
int tries = 100;
ssize_t bytes_read;
@@ -92,15 +96,44 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz
return (int)bytes_read;
}
-static inline bool receiver_read_uncompressed(struct receiver_state *r) {
+static inline STREAM_HANDSHAKE read_stream_error_to_reason(int code) {
+ if(code > 0)
+ return 0;
+
+ switch(code) {
+ case 0:
+ // asked to read zero bytes
+ return STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER;
+
+ case -1:
+ // EOF
+ return STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF;
+
+ case -2:
+ // failed to read
+ return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED;
+
+ case -3:
+ // timeout
+ return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT;
+
+ default:
+ // anything else
+ return STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
+ }
+}
+
+static inline bool receiver_read_uncompressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) {
#ifdef NETDATA_INTERNAL_CHECKS
if(r->reader.read_buffer[r->reader.read_len] != '\0')
fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
#endif
int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1);
- if(unlikely(bytes_read <= 0))
+ if(unlikely(bytes_read <= 0)) {
+ *reason = read_stream_error_to_reason(bytes_read);
return false;
+ }
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read);
@@ -111,8 +144,7 @@ static inline bool receiver_read_uncompressed(struct receiver_state *r) {
return true;
}
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-static inline bool receiver_read_compressed(struct receiver_state *r) {
+static inline bool receiver_read_compressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) {
internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0',
"%s: read_buffer does not start with zero #2", __FUNCTION__ );
@@ -150,8 +182,10 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
int bytes_read = 0;
do {
int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read);
- if (unlikely(ret <= 0))
+ if (unlikely(ret <= 0)) {
+ *reason = read_stream_error_to_reason(ret);
return false;
+ }
bytes_read += ret;
} while(unlikely(bytes_read < (int)r->decompressor.signature_size));
@@ -187,7 +221,7 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
int last_read_bytes = read_stream(r, &compressed[start], remaining);
if (unlikely(last_read_bytes <= 0)) {
- internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes);
+ *reason = read_stream_error_to_reason(last_read_bytes);
return false;
}
@@ -217,57 +251,6 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
return true;
}
-#else // !ENABLE_RRDPUSH_COMPRESSION
-static inline bool receiver_read_compressed(struct receiver_state *r) {
- return receiver_read_uncompressed(r);
-}
-#endif // ENABLE_RRDPUSH_COMPRESSION
-
-/* Produce a full line if one exists, statefully return where we start next time.
- * When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
- */
-inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) {
- buffer_need_bytes(dst, reader->read_len - reader->pos + 2);
-
- size_t start = reader->pos;
-
- char *ss = &reader->read_buffer[start];
- char *se = &reader->read_buffer[reader->read_len];
- char *ds = &dst->buffer[dst->len];
- char *de = &ds[dst->size - dst->len - 2];
-
- if(ss >= se) {
- *ds = '\0';
- reader->pos = 0;
- reader->read_len = 0;
- reader->read_buffer[reader->read_len] = '\0';
- return false;
- }
-
- // copy all bytes to buffer
- while(ss < se && ds < de && *ss != '\n') {
- *ds++ = *ss++;
- dst->len++;
- }
-
- // if we have a newline, return the buffer
- if(ss < se && ds < de && *ss == '\n') {
- // newline found in the r->read_buffer
-
- *ds++ = *ss++; // copy the newline too
- dst->len++;
-
- *ds = '\0';
-
- reader->pos = ss - reader->read_buffer;
- return true;
- }
-
- reader->pos = 0;
- reader->read_len = 0;
- reader->read_buffer[reader->read_len] = '\0';
- return false;
-}
bool plugin_is_enabled(struct plugind *cd);
@@ -315,6 +298,10 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
}
+#ifdef ENABLE_H2O
+ parser->h2o_ctx = rpt->h2o_ctx;
+#endif
+
pluginsd_keywords_init(parser, PARSER_INIT_STREAMING);
rrd_collector_started();
@@ -323,43 +310,59 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- bool compressed_connection = false;
+ {
+ bool compressed_connection = rrdpush_decompression_initialize(rpt);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
- if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
- compressed_connection = true;
- rrdpush_decompressor_reset(&rpt->decompressor);
- }
- else
- rrdpush_decompressor_destroy(&rpt->decompressor);
+ buffered_reader_init(&rpt->reader);
+
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ {
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(
+ rpt->host) : "unknown"
+ );
+ parser->user.stream_log_fp = fopen(filename, "w");
+ parser->user.stream_log_repertoire = PARSER_REP_METADATA;
+ }
#endif
- buffered_reader_init(&rpt->reader);
+ CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
- BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
- while(!receiver_should_stop(rpt)) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
+ ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- if(!buffered_reader_next_line(&rpt->reader, buffer)) {
- bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt);
+ while(!receiver_should_stop(rpt)) {
- if(unlikely(!have_new_data)) {
- receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, false);
- break;
- }
+ if(!buffered_reader_next_line(&rpt->reader, buffer)) {
+ STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
- continue;
- }
+ bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason)
+ : receiver_read_uncompressed(rpt, &reason);
- if (unlikely(parser_action(parser, buffer->buffer))) {
- receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
- break;
- }
+ if(unlikely(!have_new_data)) {
+ receiver_set_exit_reason(rpt, reason, false);
+ break;
+ }
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- buffer_free(buffer);
- result = parser->user.data_collections_count;
+ continue;
+ }
+
+ if(unlikely(parser_action(parser, buffer->buffer))) {
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
+ break;
+ }
+
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ result = parser->user.data_collections_count;
+ }
// free parser with the pop function
netdata_thread_cleanup_pop(1);
@@ -400,10 +403,10 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) {
if (rpt->config.alarms_delay > 0) {
host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay;
- netdata_log_health(
- "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
- rrdhost_hostname(host),
- (int64_t) rpt->config.alarms_delay);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
+ rrdhost_hostname(host),
+ (int64_t) rpt->config.alarms_delay);
}
}
@@ -521,26 +524,31 @@ static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *r
5);
}
-void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) {
-
- log_stream_connection(rpt->client_ip, rpt->client_port,
- (rpt->key && *rpt->key)? rpt->key : "-",
- (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-",
- (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-",
- status);
-
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
- "%s. "
- "STATUS: %s%s%s%s"
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- , msg
- , status
- , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":""
- , stream_handshake_error_to_string(rpt->exit.reason)
- , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":""
- );
-
+void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) {
+ // this function may be called BEFORE we spawn the receiver thread
+ // so, we need to add the fields again (it does not harm)
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
+ ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""),
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status),
+ ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_from_child_msgid),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ nd_log(NDLS_ACCESS, priority, "api_key:'%s' machine_guid:'%s' msg:'%s'"
+ , (rpt->key && *rpt->key)? rpt->key : ""
+ , (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : ""
+ , msg);
+
+ nd_log(NDLS_DAEMON, priority, "STREAM_RECEIVER for '%s': %s %s%s%s"
+ , (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""
+ , msg
+ , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":""
+ , stream_handshake_error_to_string(rpt->exit.reason)
+ , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":""
+ );
}
static void rrdpush_receive(struct receiver_state *rpt)
@@ -611,11 +619,19 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step);
rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
rpt->config.rrdpush_compression = default_rrdpush_compression_enabled;
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression);
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression);
-#endif // ENABLE_RRDPUSH_COMPRESSION
+
+ bool is_ephemeral = false;
+ is_ephemeral = appconfig_get_boolean(&stream_config, rpt->key, "is ephemeral node", CONFIG_BOOLEAN_NO);
+ is_ephemeral = appconfig_get_boolean(&stream_config, rpt->machine_guid, "is ephemeral node", is_ephemeral);
+
+ if(rpt->config.rrdpush_compression) {
+ char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", RRDPUSH_COMPRESSION_ALGORITHMS_ORDER);
+ order = appconfig_get(&stream_config, rpt->machine_guid, "compression algorithms order", order);
+ rrdpush_parse_compression_order(rpt, order);
+ }
(void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
@@ -623,39 +639,46 @@ static void rrdpush_receive(struct receiver_state *rpt)
{
// this will also update the host with our system_info
RRDHOST *host = rrdhost_find_or_create(
- rpt->hostname
- , rpt->registry_hostname
- , rpt->machine_guid
- , rpt->os
- , rpt->timezone
- , rpt->abbrev_timezone
- , rpt->utc_offset
- , rpt->tags
- , rpt->program_name
- , rpt->program_version
- , rpt->config.update_every
- , rpt->config.history
- , rpt->config.mode
- , (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO)
- , (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key)
- , rpt->config.rrdpush_destination
- , rpt->config.rrdpush_api_key
- , rpt->config.rrdpush_send_charts_matching
- , rpt->config.rrdpush_enable_replication
- , rpt->config.rrdpush_seconds_to_replicate
- , rpt->config.rrdpush_replication_step
- , rpt->system_info
- , 0
- );
+ rpt->hostname,
+ rpt->registry_hostname,
+ rpt->machine_guid,
+ rpt->os,
+ rpt->timezone,
+ rpt->abbrev_timezone,
+ rpt->utc_offset,
+ rpt->tags,
+ rpt->program_name,
+ rpt->program_version,
+ rpt->config.update_every,
+ rpt->config.history,
+ rpt->config.mode,
+ (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO),
+ (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination &&
+ *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key &&
+ *rpt->config.rrdpush_api_key),
+ rpt->config.rrdpush_destination,
+ rpt->config.rrdpush_api_key,
+ rpt->config.rrdpush_send_charts_matching,
+ rpt->config.rrdpush_enable_replication,
+ rpt->config.rrdpush_seconds_to_replicate,
+ rpt->config.rrdpush_replication_step,
+ rpt->system_info,
+ 0);
if(!host) {
- rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION");
+ rrdpush_receive_log_status(
+ rpt,"failed to find/create host structure, rejecting connection",
+ RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR);
goto cleanup;
}
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
- rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER");
+ rrdpush_receive_log_status(
+ rpt, "host is initializing, retry later",
+ RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS, NDLP_NOTICE);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION);
goto cleanup;
}
@@ -664,7 +687,10 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->system_info = NULL;
if(!rrdhost_set_receiver(host, rpt)) {
- rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION");
+ rrdpush_receive_log_status(
+ rpt, "host is already served by another receiver",
+ RRDPUSH_STATUS_DUPLICATE_RECEIVER, NDLP_INFO);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING);
goto cleanup;
}
@@ -709,12 +735,7 @@ static void rrdpush_receive(struct receiver_state *rpt)
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
- if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
- if (!rpt->config.rrdpush_compression)
- rpt->capabilities &= ~STREAM_CAP_COMPRESSION;
- }
-#endif // ENABLE_RRDPUSH_COMPRESSION
+ rrdpush_select_receiver_compression_algorithm(rpt);
{
// netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
@@ -737,19 +758,32 @@ static void rrdpush_receive(struct receiver_state *rpt)
}
netdata_log_debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
- ssize_t bytes_sent = send_timeout(
+#ifdef ENABLE_H2O
+ if (is_h2o_rrdpush(rpt)) {
+ h2o_stream_write(rpt->h2o_ctx, initial_response, strlen(initial_response));
+ } else {
+#endif
+ ssize_t bytes_sent = send_timeout(
#ifdef ENABLE_HTTPS
- &rpt->ssl,
+ &rpt->ssl,
#endif
- rpt->fd, initial_response, strlen(initial_response), 0, 60);
-
- if(bytes_sent != (ssize_t)strlen(initial_response)) {
- internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response));
- rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION");
- goto cleanup;
+ rpt->fd, initial_response, strlen(initial_response), 0, 60);
+
+ if(bytes_sent != (ssize_t)strlen(initial_response)) {
+ internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response));
+ rrdpush_receive_log_status(
+ rpt, "cannot reply back, dropping connection",
+ RRDPUSH_STATUS_CANT_REPLY, NDLP_ERR);
+ goto cleanup;
+ }
+#ifdef ENABLE_H2O
}
+#endif
}
+#ifdef ENABLE_H2O
+ unless_h2o_rrdpush(rpt)
+#endif
{
// remove the non-blocking flag from the socket
if(sock_delnonblock(rpt->fd) < 0)
@@ -770,17 +804,22 @@ static void rrdpush_receive(struct receiver_state *rpt)
, rpt->fd);
}
- rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED");
+ rrdpush_receive_log_status(
+ rpt, "connected and ready to receive data",
+ RRDPUSH_STATUS_CONNECTED, NDLP_INFO);
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
if (netdata_cloud_enabled)
- aclk_host_state_update(rpt->host, 1);
+ aclk_host_state_update(rpt->host, 1, 1);
#endif
rrdhost_set_is_parent_label();
+ if (is_ephemeral)
+ rrdhost_option_set(rpt->host, RRDHOST_OPTION_EPHEMERAL_HOST);
+
// let it reconnect to parent immediately
rrdpush_reset_destinations_postpone_time(rpt->host);
@@ -796,15 +835,17 @@ static void rrdpush_receive(struct receiver_state *rpt)
{
char msg[100 + 1];
- snprintfz(msg, 100, "disconnected (completed %zu updates)", count);
- rrdpush_receive_log_status(rpt, msg, "DISCONNECTED");
+ snprintfz(msg, sizeof(msg) - 1, "disconnected (completed %zu updates)", count);
+ rrdpush_receive_log_status(
+ rpt, msg,
+ RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING);
}
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// a child disconnected
if (netdata_cloud_enabled)
- aclk_host_state_update(rpt->host, 0);
+ aclk_host_state_update(rpt->host, 0, 1);
#endif
cleanup:
@@ -828,19 +869,64 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
rrdhost_set_is_parent_label();
}
+static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) {
+ struct receiver_state *rpt = ptr;
+ if(!rpt)
+ return false;
+
+ stream_capabilities_to_string(wb, rpt->capabilities);
+ return true;
+}
+
+static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) {
+ struct receiver_state *rpt = ptr;
+ if(!rpt)
+ return false;
+
+#ifdef ENABLE_HTTPS
+ buffer_strcat(wb, SSL_connection(&rpt->ssl) ? "https" : "http");
+#else
+ buffer_strcat(wb, "http");
+#endif
+ return true;
+}
+
void *rrdpush_receiver_thread(void *ptr) {
netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
- worker_register("STREAMRCV");
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT);
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT);
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
+ {
+ worker_register("STREAMRCV");
+
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ,
+ "received bytes", "bytes/s",
+ WORKER_METRIC_INCREMENT);
+
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED,
+ "uncompressed bytes", "bytes/s",
+ WORKER_METRIC_INCREMENT);
- struct receiver_state *rpt = (struct receiver_state *)ptr;
- rpt->tid = gettid();
- netdata_log_info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ "replication completion", "%",
+ WORKER_METRIC_ABSOLUTE);
- rrdpush_receive(rpt);
+ struct receiver_state *rpt = (struct receiver_state *) ptr;
+ rpt->tid = gettid();
+
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
+ ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname),
+ ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt),
+ ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip
+ , rpt->client_port);
+
+ rrdpush_receive(rpt);
+ }
netdata_thread_cleanup_pop(1);
return NULL;
diff --git a/streaming/replication.c b/streaming/replication.c
index ffb6b3def..bc34361b3 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -168,7 +168,7 @@ static struct replication_query *replication_query_prepare(
size_t count = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if (unlikely(!rd || !rd_dfe.item || !rrddim_check_exposed(rd)))
+ if (unlikely(!rd || !rd_dfe.item || !rrddim_check_upstream_exposed(rd)))
continue;
if (unlikely(rd_dfe.counter >= q->dimensions)) {
@@ -213,31 +213,38 @@ static struct replication_query *replication_query_prepare(
}
static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) {
- NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ bool with_slots = (capabilities & STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
RRDDIM *rd;
rrddim_foreach_read(rd, st){
- if (!rrddim_check_exposed(rd)) continue;
+ if (!rrddim_check_upstream_exposed(rd)) continue;
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '",
- sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC +
- (usec_t) rd->collector.last_collected_time.tv_usec);
+ buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC +
+ (usec_t) rd->collector.last_collected_time.tv_usec);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_int64_encoded(wb, encoding, rd->collector.last_collected_value);
+ buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_calculated_value);
+ buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_calculated_value);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_stored_value);
+ buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_stored_value);
buffer_fast_strcat(wb, "\n", 1);
}
rrddim_foreach_done(rd);
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1);
- buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
+ buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
+ buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
buffer_fast_strcat(wb, "\n", 1);
}
@@ -313,7 +320,8 @@ static void replication_query_align_to_optimal_before(struct replication_query *
static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
replication_query_align_to_optimal_before(q);
- NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
time_t after = q->query.after;
time_t before = q->query.before;
size_t dimensions = q->dimensions;
@@ -344,8 +352,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
if(max_skip <= 0) {
d->skip = true;
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
"STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query "
"beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
@@ -394,14 +402,15 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
fix_min_start_time = min_end_time - min_update_every;
#ifdef NETDATA_INTERNAL_CHECKS
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
- "misaligned dimensions, "
- "update every (min: %ld, max: %ld), "
- "start time (min: %ld, max: %ld), "
- "end time (min %ld, max %ld), "
- "now %ld, last end time sent %ld, "
- "min start time is fixed to %ld",
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "REPLAY WARNING: 'host:%s/chart:%s' "
+ "misaligned dimensions, "
+ "update every (min: %ld, max: %ld), "
+ "start time (min: %ld, max: %ld), "
+ "end time (min %ld, max %ld), "
+ "now %ld, last end time sent %ld, "
+ "min start time is fixed to %ld",
rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
min_update_every, max_update_every,
min_start_time, max_start_time,
@@ -444,12 +453,19 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
}
last_end_time_in_buffer = min_end_time;
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4);
- buffer_print_uint64_encoded(wb, encoding, min_start_time);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot);
+ }
+
+ buffer_fast_strcat(wb, " '' ", 4);
+ buffer_print_uint64_encoded(wb, integer_encoding, min_start_time);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, min_end_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, min_end_time);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
buffer_fast_strcat(wb, "\n", 1);
// output the replay values for this time
@@ -462,10 +478,17 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
!storage_point_is_unset(d->sp) &&
!storage_point_is_gap(d->sp))) {
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET, sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, d->rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " \"", 2);
buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id));
buffer_fast_strcat(wb, "\" ", 2);
- buffer_print_netdata_double_encoded(wb, encoding, d->sp.sum);
+ buffer_print_netdata_double_encoded(wb, integer_encoding, d->sp.sum);
buffer_fast_strcat(wb, " ", 1);
buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED);
buffer_fast_strcat(wb, "\n", 1);
@@ -595,7 +618,8 @@ void replication_response_cancel_and_finalize(struct replication_query *q) {
static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
- NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
struct replication_request *rq = q->rq;
RRDSET *st = q->st;
RRDHOST *host = st->rrdhost;
@@ -605,12 +629,17 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// holding the host's buffer lock for too long
BUFFER *wb = sender_start(host->sender);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "'\n", 2);
-// buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
-
bool locked_data_collection = q->query.locked_data_collection;
q->query.locked_data_collection = false;
@@ -634,19 +663,19 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// last end time of the data we sent
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
- buffer_print_int64_encoded(wb, encoding, st->update_every);
+ buffer_print_int64_encoded(wb, integer_encoding, st->update_every);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, db_first_entry);
+ buffer_print_uint64_encoded(wb, integer_encoding, db_first_entry);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, db_last_entry);
+ buffer_print_uint64_encoded(wb, integer_encoding, db_last_entry);
buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7);
- buffer_print_uint64_encoded(wb, encoding, after);
+ buffer_print_uint64_encoded(wb, integer_encoding, after);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, before);
+ buffer_print_uint64_encoded(wb, integer_encoding, before);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
buffer_fast_strcat(wb, "\n", 1);
worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
@@ -664,7 +693,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
if(!finished_with_gap)
- st->upstream_resync_time_s = 0;
+ st->rrdpush.sender.resync_time_s = 0;
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
@@ -729,8 +758,8 @@ static void replicate_log_request(struct replication_request_details *r, const c
#ifdef NETDATA_INTERNAL_CHECKS
internal_error(true,
#else
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"REPLAY ERROR: 'host:%s/chart:%s' child sent: "
"db from %ld to %ld%s, wall clock time %ld, "
@@ -793,7 +822,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
#endif // NETDATA_LOG_REPLICATION_REQUESTS
char buffer[2048 + 1];
- snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
+ snprintfz(buffer, sizeof(buffer) - 1, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
(unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index a42bc13a0..7c1df2cad 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -39,9 +39,9 @@ struct config stream_config = {
};
unsigned int default_rrdpush_enabled = 0;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
+STREAM_CAPABILITIES globally_disabled_capabilities = STREAM_CAP_NONE;
+
unsigned int default_rrdpush_compression_enabled = 1;
-#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
char *default_rrdpush_send_charts_matching = NULL;
@@ -57,53 +57,16 @@ static void load_stream_conf() {
errno = 0;
char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL)) {
- netdata_log_info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
+ nd_log_daemon(NDLP_NOTICE, "CONFIG: cannot load user config '%s'. Will try stock config.", filename);
freez(filename);
filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL))
- netdata_log_info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
+ nd_log_daemon(NDLP_NOTICE, "CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
}
freez(filename);
}
-STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
-
- // we can have DATA_WITH_ML when INTERPOLATED is available
- bool ml_capability = true;
-
- if(host && sender) {
- // we have DATA_WITH_ML capability
- // we should remove the DATA_WITH_ML capability if our database does not have anomaly info
- // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
- netdata_mutex_lock(&host->receiver_lock);
-
- if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
- ml_capability = false;
-
- netdata_mutex_unlock(&host->receiver_lock);
- }
-
- return STREAM_CAP_V1 |
- STREAM_CAP_V2 |
- STREAM_CAP_VN |
- STREAM_CAP_VCAPS |
- STREAM_CAP_HLABELS |
- STREAM_CAP_CLAIM |
- STREAM_CAP_CLABELS |
- STREAM_CAP_FUNCTIONS |
- STREAM_CAP_REPLICATION |
- STREAM_CAP_BINARY |
- STREAM_CAP_INTERPOLATED |
- STREAM_HAS_COMPRESSION |
-#ifdef NETDATA_TEST_DYNCFG
- STREAM_CAP_DYNCFG |
-#endif
- (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
- (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) |
- 0;
-}
-
bool rrdpush_receiver_needs_dbengine() {
struct section *co;
@@ -145,13 +108,27 @@ int rrdpush_init() {
rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
"enable compression", default_rrdpush_compression_enabled);
-#endif
+
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_BROTLI] = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "brotli compression level",
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_BROTLI]);
+
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD] = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "zstd compression level",
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD]);
+
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4] = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "lz4 compression acceleration",
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4]);
+
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP] = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "gzip compression level",
+ rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP]);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
- netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing.");
+ nd_log_daemon(NDLP_WARNING, "STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
}
@@ -159,7 +136,7 @@ int rrdpush_init() {
netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate);
if(!netdata_ssl_validate_certificate_sender)
- netdata_log_info("SSL: streaming senders will skip SSL certificates verification.");
+ nd_log_daemon(NDLP_NOTICE, "SSL: streaming senders will skip SSL certificates verification.");
netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
@@ -245,11 +222,13 @@ static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) {
// Send the current chart definition.
// Assumes that collector thread has already called sender_start for mutex / buffer state.
static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
- bool replication_progress = false;
+ uint32_t version = rrdset_metadata_version(st);
RRDHOST *host = st->rrdhost;
+ NUMBER_ENCODING integer_encoding = stream_has_capability(host->sender, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ bool with_slots = stream_has_capability(host->sender, STREAM_CAP_SLOTS) ? true : false;
- rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ bool replication_progress = false;
// properly set the name for the remote end to parse it
char *name = "";
@@ -264,10 +243,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
}
}
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_CHART, sizeof(PLUGINSD_KEYWORD_CHART) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot);
+ }
+
// send the chart
buffer_sprintf(
wb
- , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
+ , " \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
, rrdset_id(st)
, name
, rrdset_title(st)
@@ -292,19 +278,25 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
// send the dimensions
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_DIMENSION, sizeof(PLUGINSD_KEYWORD_DIMENSION) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
buffer_sprintf(
- wb
- , "DIMENSION \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n"
- , rrddim_id(rd)
- , rrddim_name(rd)
- , rrd_algorithm_name(rd->algorithm)
- , rd->multiplier
- , rd->divisor
- , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
- , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
- , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
+ wb
+ , " \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n"
+ , rrddim_id(rd)
+ , rrddim_name(rd)
+ , rrd_algorithm_name(rd->algorithm)
+ , rd->multiplier
+ , rd->divisor
+ , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
+ , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
+ , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
- rrddim_set_exposed(rd);
}
rrddim_foreach_done(rd);
@@ -339,7 +331,17 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
#endif
}
- st->upstream_resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ // we can set the exposed flag, after we commit the buffer
+ // because replication may pick it up prematurely
+ rrddim_foreach_read(rd, st) {
+ rrddim_metadata_exposed_upstream(rd, version);
+ }
+ rrddim_foreach_done(rd);
+ rrdset_metadata_exposed_upstream(st, version);
+
+ st->rrdpush.sender.resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
return replication_progress;
}
@@ -349,7 +351,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "\" ", 2);
- if(st->last_collected_time.tv_sec > st->upstream_resync_time_s)
+ if(st->last_collected_time.tv_sec > st->rrdpush.sender.resync_time_s)
buffer_print_uint64(wb, st->usec_since_last_update);
else
buffer_fast_strcat(wb, "0", 1);
@@ -361,7 +363,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
if(unlikely(!rrddim_check_updated(rd)))
continue;
- if(likely(rrddim_check_exposed(rd))) {
+ if(likely(rrddim_check_upstream_exposed_collector(rd))) {
buffer_fast_strcat(wb, "SET \"", 5);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "\" = ", 4);
@@ -372,7 +374,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed",
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
// we will include it in the next iteration
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrddim_metadata_updated(rd);
}
}
rrddim_foreach_done(rd);
@@ -390,12 +392,12 @@ bool rrdset_push_chart_definition_now(RRDSET *st) {
RRDHOST *host = st->rrdhost;
if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
- || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST))))
+ || !should_send_chart_matching(st, rrdset_flag_get(st)))) {
return false;
+ }
BUFFER *wb = sender_start(host->sender);
rrdpush_send_chart_definition(wb, st);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
return true;
@@ -410,6 +412,7 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags))
return;
+ bool with_slots = stream_has_capability(rsb, STREAM_CAP_SLOTS) ? true : false;
NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
BUFFER *wb = rsb->wb;
@@ -419,7 +422,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
if(unlikely(rsb->begin_v2_added))
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->rrdpush.sender.chart_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id));
buffer_fast_strcat(wb, "' ", 2);
buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every);
@@ -436,7 +446,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
rsb->begin_v2_added = true;
}
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
@@ -485,11 +502,14 @@ void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, cons
BUFFER *wb = sender_start(host->sender);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
- if (job->reason)
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
+
+ if (job->reason && strlen(job->reason))
buffer_sprintf(wb, " \"%s\"", job->reason);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ buffer_strcat(wb, "\n");
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
sender_thread_buffer_free();
@@ -503,7 +523,7 @@ void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char
buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
sender_thread_buffer_free();
}
@@ -522,24 +542,24 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
- netdata_log_error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+ nd_log_daemon(NDLP_NOTICE, "STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
}
return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
}
else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
- netdata_log_info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
+ nd_log_daemon(NDLP_INFO, "STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
BUFFER *wb = sender_start(host->sender);
rrd_functions_expose_global_rrdpush(host, wb);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
}
- RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
- bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
+ bool exposed_upstream = rrdset_check_upstream_exposed(st);
+ RRDSET_FLAGS rrdset_flags = rrdset_flag_get(st);
bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
if(unlikely((exposed_upstream && replication_in_progress) ||
@@ -549,7 +569,6 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(!exposed_upstream)) {
BUFFER *wb = sender_start(host->sender);
replication_in_progress = rrdpush_send_chart_definition(wb, st);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
}
if(replication_in_progress)
@@ -597,7 +616,7 @@ void rrdpush_send_global_functions(RRDHOST *host) {
rrd_functions_expose_global_rrdpush(host, wb);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
sender_thread_buffer_free();
}
@@ -630,7 +649,7 @@ void rrdpush_send_dyncfg(RRDHOST *host) {
}
dfe_done(plug);
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
sender_thread_buffer_free();
}
@@ -656,7 +675,7 @@ void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, cons
buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type));
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
sender_thread_buffer_free();
}
@@ -669,6 +688,19 @@ void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const c
buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_RESET " %s\n", plugin_name);
+
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
@@ -709,11 +741,9 @@ int connect_to_one_of_destinations(
if(d->postpone_reconnection_until > now)
continue;
- internal_error(true,
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
"STREAM %s: connecting to '%s' (default port: %d)...",
- rrdhost_hostname(host),
- string2str(d->destination),
- default_port);
+ rrdhost_hostname(host), string2str(d->destination), default_port);
if (reconnects_counter)
*reconnects_counter += 1;
@@ -766,7 +796,7 @@ bool destinations_init_add_one(char *entry, void *data) {
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next);
t->count++;
- netdata_log_info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
+ nd_log_daemon(NDLP_INFO, "STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
return false; // we return false, so that we will get all defined destinations
}
@@ -835,11 +865,6 @@ void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wai
// ----------------------------------------------------------------------------
// rrdpush receiver thread
-void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
- netdata_log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
-}
-
-
static void rrdpush_sender_thread_spawn(RRDHOST *host) {
sender_lock(host->sender);
@@ -848,7 +873,7 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host));
if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
- netdata_log_error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
+ nd_log_daemon(NDLP_ERR, "STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
else
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
}
@@ -898,16 +923,21 @@ static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struc
}
void *rrdpush_receiver_thread(void *ptr);
-int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) {
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx) {
if(!service_running(ABILITY_STREAMING_CONNECTIONS))
return rrdpush_receiver_too_busy_now(w);
struct receiver_state *rpt = callocz(1, sizeof(*rpt));
rpt->last_msg_t = now_monotonic_sec();
- rpt->capabilities = STREAM_CAP_INVALID;
rpt->hops = 1;
+ rpt->capabilities = STREAM_CAP_INVALID;
+
+#ifdef ENABLE_H2O
+ rpt->h2o_ctx = h2o_ctx;
+#endif
+
__atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
__atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
@@ -1003,7 +1033,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false);
if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
+ nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: "
"request has parameter '%s' = '%s', which is not used."
, (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-"
, rpt->client_ip, rpt->client_port
@@ -1032,9 +1062,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!rpt->key || !*rpt->key) {
rrdpush_receive_log_status(
- rpt,
- "request without an API key",
- "NO API KEY PERMISSION DENIED");
+ rpt, "request without an API key, rejecting connection",
+ RRDPUSH_STATUS_NO_API_KEY, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1042,9 +1071,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!rpt->hostname || !*rpt->hostname) {
rrdpush_receive_log_status(
- rpt,
- "request without a hostname",
- "NO HOSTNAME PERMISSION DENIED");
+ rpt, "request without a hostname, rejecting connection",
+ RRDPUSH_STATUS_NO_HOSTNAME, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1055,9 +1083,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!rpt->machine_guid || !*rpt->machine_guid) {
rrdpush_receive_log_status(
- rpt,
- "request without a machine GUID",
- "NO MACHINE GUID PERMISSION DENIED");
+ rpt, "request without a machine GUID, rejecting connection",
+ RRDPUSH_STATUS_NO_MACHINE_GUID, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1068,9 +1095,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (regenerate_guid(rpt->key, buf) == -1) {
rrdpush_receive_log_status(
- rpt,
- "API key is not a valid UUID (use the command uuidgen to generate one)",
- "INVALID API KEY PERMISSION DENIED");
+ rpt, "API key is not a valid UUID (use the command uuidgen to generate one)",
+ RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1078,9 +1104,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (regenerate_guid(rpt->machine_guid, buf) == -1) {
rrdpush_receive_log_status(
- rpt,
- "machine GUID is not a valid UUID",
- "INVALID MACHINE GUID PERMISSION DENIED");
+ rpt, "machine GUID is not a valid UUID",
+ RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1091,9 +1116,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!api_key_type || !*api_key_type) api_key_type = "unknown";
if(strcmp(api_key_type, "api") != 0) {
rrdpush_receive_log_status(
- rpt,
- "API key is a machine GUID",
- "INVALID API KEY PERMISSION DENIED");
+ rpt, "API key is a machine GUID",
+ RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1101,9 +1125,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) {
rrdpush_receive_log_status(
- rpt,
- "API key is not enabled",
- "API KEY DISABLED PERMISSION DENIED");
+ rpt, "API key is not enabled",
+ RRDPUSH_STATUS_API_KEY_DISABLED, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1119,9 +1142,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
simple_pattern_free(key_allow_from);
rrdpush_receive_log_status(
- rpt,
- "API key is not allowed from this IP",
- "NOT ALLOWED IP PERMISSION DENIED");
+ rpt, "API key is not allowed from this IP",
+ RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1137,9 +1159,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (strcmp(machine_guid_type, "machine") != 0) {
rrdpush_receive_log_status(
- rpt,
- "machine GUID is an API key",
- "INVALID MACHINE GUID PERMISSION DENIED");
+ rpt, "machine GUID is an API key",
+ RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1148,9 +1169,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) {
rrdpush_receive_log_status(
- rpt,
- "machine GUID is not enabled",
- "MACHINE GUID DISABLED PERMISSION DENIED");
+ rpt, "machine GUID is not enabled",
+ RRDPUSH_STATUS_MACHINE_GUID_DISABLED, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1166,9 +1186,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
simple_pattern_free(machine_allow_from);
rrdpush_receive_log_status(
- rpt,
- "machine GUID is not allowed from this IP",
- "NOT ALLOWED IP PERMISSION DENIED");
+ rpt, "machine GUID is not allowed from this IP",
+ RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1183,9 +1202,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rrdpush_receiver_takeover_web_connection(w, rpt);
rrdpush_receive_log_status(
- rpt,
- "machine GUID is my own",
- "LOCALHOST PERMISSION DENIED");
+ rpt, "machine GUID is my own",
+ RRDPUSH_STATUS_LOCALHOST, NDLP_DEBUG);
char initial_response[HTTP_HEADER_SIZE + 1];
snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
@@ -1196,11 +1214,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
#endif
rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
- netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
- "failed to reply."
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- );
+ nd_log_daemon(NDLP_ERR, "STREAM '%s' [receive from [%s]:%s]: "
+ "failed to reply."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
}
receiver_state_free(rpt);
@@ -1221,14 +1239,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
spinlock_unlock(&spinlock);
char msg[100 + 1];
- snprintfz(msg, 100,
+ snprintfz(msg, sizeof(msg) - 1,
"rate limit, will accept new connection in %ld secs",
(long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
rrdpush_receive_log_status(
- rpt,
- msg,
- "RATE LIMIT TRY LATER");
+ rpt, msg,
+ RRDPUSH_STATUS_RATE_LIMIT, NDLP_NOTICE);
receiver_state_free(rpt);
return rrdpush_receiver_too_busy_now(w);
@@ -1276,29 +1293,26 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
// we can proceed with this connection
receiver_stale = false;
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
- "stopped previous stale receiver to accept this one."
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- );
+ nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: "
+ "stopped previous stale receiver to accept this one."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
}
if (receiver_working || receiver_stale) {
// another receiver is already connected
// try again later
-#ifdef NETDATA_INTERNAL_CHECKS
char msg[200 + 1];
- snprintfz(msg, 200,
+ snprintfz(msg, sizeof(msg) - 1,
"multiple connections for same host, "
- "old connection was used %ld secs ago%s",
+ "old connection was last used %ld secs ago%s",
age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)");
rrdpush_receive_log_status(
- rpt,
- msg,
- "ALREADY CONNECTED");
-#endif
+ rpt, msg,
+ RRDPUSH_STATUS_ALREADY_CONNECTED, NDLP_DEBUG);
// Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
buffer_flush(w->response.data);
@@ -1308,8 +1322,6 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
}
- netdata_log_debug(D_SYSTEM, "starting STREAM receive thread.");
-
rrdpush_receiver_takeover_web_connection(w, rpt);
char tag[NETDATA_THREAD_TAG_MAX + 1];
@@ -1318,9 +1330,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) {
rrdpush_receive_log_status(
- rpt,
- "can't create receiver thread",
- "INTERNAL SERVER ERROR");
+ rpt, "can't create receiver thread",
+ RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR);
buffer_flush(w->response.data);
buffer_strcat(w->response.data, "Can't handle this request");
@@ -1364,11 +1375,15 @@ static struct {
{ STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" },
{ STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" },
{ STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" },
- { STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" },
+ {STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR, "DISCONNECTED UNKNOWN SOCKET READ ERROR" },
{ STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" },
{ STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" },
{ STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" },
{ STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" },
+ { STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER, "DISCONNECTED NOT SUFFICIENT READ BUFFER" },
+ {STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF, "DISCONNECTED SOCKET EOF" },
+ {STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED, "DISCONNECTED SOCKET READ FAILED" },
+ {STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT, "DISCONNECTED SOCKET READ TIMEOUT" },
{ 0, NULL },
};
@@ -1389,25 +1404,29 @@ static struct {
STREAM_CAPABILITIES cap;
const char *str;
} capability_names[] = {
- { STREAM_CAP_V1, "V1" },
- { STREAM_CAP_V2, "V2" },
- { STREAM_CAP_VN, "VN" },
- { STREAM_CAP_VCAPS, "VCAPS" },
- { STREAM_CAP_HLABELS, "HLABELS" },
- { STREAM_CAP_CLAIM, "CLAIM" },
- { STREAM_CAP_CLABELS, "CLABELS" },
- { STREAM_CAP_COMPRESSION, "COMPRESSION" },
- { STREAM_CAP_FUNCTIONS, "FUNCTIONS" },
- { STREAM_CAP_REPLICATION, "REPLICATION" },
- { STREAM_CAP_BINARY, "BINARY" },
- { STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
- { STREAM_CAP_IEEE754, "IEEE754" },
- { STREAM_CAP_DATA_WITH_ML, "ML" },
- { STREAM_CAP_DYNCFG, "DYN_CFG" },
- { 0 , NULL },
+ {STREAM_CAP_V1, "V1" },
+ {STREAM_CAP_V2, "V2" },
+ {STREAM_CAP_VN, "VN" },
+ {STREAM_CAP_VCAPS, "VCAPS" },
+ {STREAM_CAP_HLABELS, "HLABELS" },
+ {STREAM_CAP_CLAIM, "CLAIM" },
+ {STREAM_CAP_CLABELS, "CLABELS" },
+ {STREAM_CAP_LZ4, "LZ4" },
+ {STREAM_CAP_FUNCTIONS, "FUNCTIONS" },
+ {STREAM_CAP_REPLICATION, "REPLICATION" },
+ {STREAM_CAP_BINARY, "BINARY" },
+ {STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
+ {STREAM_CAP_IEEE754, "IEEE754" },
+ {STREAM_CAP_DATA_WITH_ML, "ML" },
+ {STREAM_CAP_DYNCFG, "DYNCFG" },
+ {STREAM_CAP_SLOTS, "SLOTS" },
+ {STREAM_CAP_ZSTD, "ZSTD" },
+ {STREAM_CAP_GZIP, "GZIP" },
+ {STREAM_CAP_BROTLI, "BROTLI" },
+ {0 , NULL },
};
-static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
+void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
for(size_t i = 0; capability_names[i].str ; i++) {
if(caps & capability_names[i].cap) {
buffer_strcat(wb, capability_names[i].str);
@@ -1434,8 +1453,8 @@ void log_receiver_capabilities(struct receiver_state *rpt) {
BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, rpt->capabilities);
- netdata_log_info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
- rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
+ nd_log_daemon(NDLP_INFO, "STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
buffer_free(wb);
}
@@ -1444,12 +1463,51 @@ void log_sender_capabilities(struct sender_state *s) {
BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, s->capabilities);
- netdata_log_info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
- rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
+ nd_log_daemon(NDLP_INFO, "STREAM %s [send to %s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
buffer_free(wb);
}
+STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
+ STREAM_CAPABILITIES disabled_capabilities = globally_disabled_capabilities;
+
+ if(host && sender) {
+ // we have DATA_WITH_ML capability
+ // we should remove the DATA_WITH_ML capability if our database does not have anomaly info
+ // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
+ netdata_mutex_lock(&host->receiver_lock);
+
+ if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
+ disabled_capabilities |= STREAM_CAP_DATA_WITH_ML;
+
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ if(host->sender)
+ disabled_capabilities |= host->sender->disabled_capabilities;
+ }
+
+ return (STREAM_CAP_V1 |
+ STREAM_CAP_V2 |
+ STREAM_CAP_VN |
+ STREAM_CAP_VCAPS |
+ STREAM_CAP_HLABELS |
+ STREAM_CAP_CLAIM |
+ STREAM_CAP_CLABELS |
+ STREAM_CAP_FUNCTIONS |
+ STREAM_CAP_REPLICATION |
+ STREAM_CAP_BINARY |
+ STREAM_CAP_INTERPOLATED |
+ STREAM_CAP_SLOTS |
+ STREAM_CAP_COMPRESSIONS_AVAILABLE |
+ #ifdef NETDATA_TEST_DYNCFG
+ STREAM_CAP_DYNCFG |
+ #endif
+ STREAM_CAP_IEEE754 |
+ STREAM_CAP_DATA_WITH_ML |
+ 0) & ~disabled_capabilities;
+}
+
STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) {
STREAM_CAPABILITIES caps = 0;
@@ -1457,7 +1515,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH
else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS;
else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM;
else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS;
- else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION;
+ else if(version <= STREAM_OLD_VERSION_LZ4) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_CAP_LZ4_AVAILABLE;
else caps = version;
if(caps & STREAM_CAP_VCAPS)
@@ -1479,8 +1537,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH
}
int32_t stream_capabilities_to_vn(uint32_t caps) {
- if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION;
+ if(caps & STREAM_CAP_LZ4) return STREAM_OLD_VERSION_LZ4;
if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS;
return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM)
}
-
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index c3c14233f..1459c881e 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -18,12 +18,14 @@
#define STREAM_OLD_VERSION_CLAIM 3
#define STREAM_OLD_VERSION_CLABELS 4
-#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
+#define STREAM_OLD_VERSION_LZ4 5
// ----------------------------------------------------------------------------
// capabilities negotiation
typedef enum {
+ STREAM_CAP_NONE = 0,
+
// do not use the first 3 bits
// they used to be versions 1, 2 and 3
// before we introduce capabilities
@@ -38,7 +40,7 @@ typedef enum {
STREAM_CAP_HLABELS = (1 << 7), // host labels supported
STREAM_CAP_CLAIM = (1 << 8), // claiming supported
STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
- STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
+ STREAM_CAP_LZ4 = (1 << 10), // lz4 compression supported
STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
STREAM_CAP_REPLICATION = (1 << 12), // replication supported
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
@@ -46,22 +48,47 @@ typedef enum {
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming
+ STREAM_CAP_SLOTS = (1 << 18), // the sender can appoint a unique slot for each chart
+ STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported
+ STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported
+ STREAM_CAP_BROTLI = (1 << 21), // BROTLI compression supported
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
// needed for negotiating errors between parent and child
} STREAM_CAPABILITIES;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
+#ifdef ENABLE_LZ4
+#define STREAM_CAP_LZ4_AVAILABLE STREAM_CAP_LZ4
+#else
+#define STREAM_CAP_LZ4_AVAILABLE 0
+#endif // ENABLE_LZ4
+
+#ifdef ENABLE_ZSTD
+#define STREAM_CAP_ZSTD_AVAILABLE STREAM_CAP_ZSTD
#else
-#define STREAM_HAS_COMPRESSION 0
-#endif // ENABLE_RRDPUSH_COMPRESSION
+#define STREAM_CAP_ZSTD_AVAILABLE 0
+#endif // ENABLE_ZSTD
+
+#ifdef ENABLE_BROTLI
+#define STREAM_CAP_BROTLI_AVAILABLE STREAM_CAP_BROTLI
+#else
+#define STREAM_CAP_BROTLI_AVAILABLE 0
+#endif // ENABLE_BROTLI
+
+#define STREAM_CAP_COMPRESSIONS_AVAILABLE (STREAM_CAP_LZ4_AVAILABLE|STREAM_CAP_ZSTD_AVAILABLE|STREAM_CAP_BROTLI_AVAILABLE|STREAM_CAP_GZIP)
+
+extern STREAM_CAPABILITIES globally_disabled_capabilities;
STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
+static inline bool stream_has_more_than_one_capability_of(STREAM_CAPABILITIES caps, STREAM_CAPABILITIES mask) {
+ STREAM_CAPABILITIES common = (STREAM_CAPABILITIES)(caps & mask);
+ return (common & (common - 1)) != 0 && common != 0;
+}
+
// ----------------------------------------------------------------------------
// stream handshake
@@ -79,6 +106,31 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
#define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later."
#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
+#define RRDPUSH_STATUS_CONNECTED "CONNECTED"
+#define RRDPUSH_STATUS_ALREADY_CONNECTED "ALREADY CONNECTED"
+#define RRDPUSH_STATUS_DISCONNECTED "DISCONNECTED"
+#define RRDPUSH_STATUS_RATE_LIMIT "RATE LIMIT TRY LATER"
+#define RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS "INITIALIZATION IN PROGRESS RETRY LATER"
+#define RRDPUSH_STATUS_INTERNAL_SERVER_ERROR "INTERNAL SERVER ERROR DROPPING CONNECTION"
+#define RRDPUSH_STATUS_DUPLICATE_RECEIVER "DUPLICATE RECEIVER DROPPING CONNECTION"
+#define RRDPUSH_STATUS_CANT_REPLY "CANT REPLY DROPPING CONNECTION"
+#define RRDPUSH_STATUS_NO_HOSTNAME "NO HOSTNAME PERMISSION DENIED"
+#define RRDPUSH_STATUS_NO_API_KEY "NO API KEY PERMISSION DENIED"
+#define RRDPUSH_STATUS_INVALID_API_KEY "INVALID API KEY PERMISSION DENIED"
+#define RRDPUSH_STATUS_NO_MACHINE_GUID "NO MACHINE GUID PERMISSION DENIED"
+#define RRDPUSH_STATUS_MACHINE_GUID_DISABLED "MACHINE GUID DISABLED PERMISSION DENIED"
+#define RRDPUSH_STATUS_INVALID_MACHINE_GUID "INVALID MACHINE GUID PERMISSION DENIED"
+#define RRDPUSH_STATUS_API_KEY_DISABLED "API KEY DISABLED PERMISSION DENIED"
+#define RRDPUSH_STATUS_NOT_ALLOWED_IP "NOT ALLOWED IP PERMISSION DENIED"
+#define RRDPUSH_STATUS_LOCALHOST "LOCALHOST PERMISSION DENIED"
+#define RRDPUSH_STATUS_PERMISSION_DENIED "PERMISSION DENIED"
+#define RRDPUSH_STATUS_BAD_HANDSHAKE "BAD HANDSHAKE"
+#define RRDPUSH_STATUS_TIMEOUT "TIMEOUT"
+#define RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION "CANT UPGRADE CONNECTION"
+#define RRDPUSH_STATUS_SSL_ERROR "SSL ERROR"
+#define RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE "INVALID SSL CERTIFICATE"
+#define RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION "CANT ESTABLISH SSL CONNECTION"
+
typedef enum {
STREAM_HANDSHAKE_OK_V3 = 3, // v3+
STREAM_HANDSHAKE_OK_V2 = 2, // v2
@@ -101,11 +153,16 @@ typedef enum {
STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15,
STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16,
STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17,
- STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18,
+ STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR = -18,
STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19,
STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20,
STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21,
STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22,
+ STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER = -23,
+ STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF = -24,
+ STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED = -25,
+ STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT = -26,
+ STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE = -27,
} STREAM_HANDSHAKE;
@@ -120,100 +177,7 @@ typedef struct {
char *kernel_version;
} stream_encoded_t;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-// signature MUST end with a newline
-#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
-#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
-#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4
-
-struct compressor_state {
- bool initialized;
- char *compression_result_buffer;
- size_t compression_result_buffer_size;
- struct {
- void *lz4_stream;
- char *input_ring_buffer;
- size_t input_ring_buffer_size;
- size_t input_ring_buffer_pos;
- } stream;
- size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
- void (*destroy)(struct compressor_state **state);
-};
-
-void rrdpush_compressor_reset(struct compressor_state *state);
-void rrdpush_compressor_destroy(struct compressor_state *state);
-size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out);
-
-struct decompressor_state {
- bool initialized;
- size_t signature_size;
- size_t total_compressed;
- size_t total_uncompressed;
- size_t packet_count;
- struct {
- void *lz4_stream;
- char *buffer;
- size_t size;
- size_t write_at;
- size_t read_at;
- } stream;
-};
-
-void rrdpush_decompressor_destroy(struct decompressor_state *state);
-void rrdpush_decompressor_reset(struct decompressor_state *state);
-size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
-
-static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) {
- if (unlikely(!data || !data_size))
- return 0;
-
- if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
- return 0;
-
- uint32_t sign = *(uint32_t *)data;
- if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
- return 0;
-
- size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
- return length;
-}
-
-static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
- if(unlikely(state->stream.read_at != state->stream.write_at))
- fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
-
- return rrdpush_decompress_decode_header(header, header_size);
-}
-
-static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
- if(unlikely(state->stream.read_at > state->stream.write_at))
- fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
-
- return state->stream.write_at - state->stream.read_at;
-}
-
-static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
- if (unlikely(!state || !size || !dst))
- return 0;
-
- size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
-
- if(unlikely(!remaining))
- return 0;
-
- size_t bytes_to_return = size;
- if(bytes_to_return > remaining)
- bytes_to_return = remaining;
-
- memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return);
- state->stream.read_at += bytes_to_return;
-
- if(unlikely(state->stream.read_at > state->stream.write_at))
- fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
-
- return bytes_to_return;
-}
-#endif
+#include "compression.h"
// Thread-local storage
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
@@ -223,6 +187,7 @@ typedef enum __attribute__((packed)) {
STREAM_TRAFFIC_TYPE_FUNCTIONS,
STREAM_TRAFFIC_TYPE_METADATA,
STREAM_TRAFFIC_TYPE_DATA,
+ STREAM_TRAFFIC_TYPE_DYNCFG,
// terminator
STREAM_TRAFFIC_TYPE_MAX,
@@ -230,7 +195,6 @@ typedef enum __attribute__((packed)) {
typedef enum __attribute__((packed)) {
SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
- SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
struct function_payload_state {
@@ -263,6 +227,7 @@ struct sender_state {
char read_buffer[PLUGINSD_LINE_MAX + 1];
ssize_t read_len;
STREAM_CAPABILITIES capabilities;
+ STREAM_CAPABILITIES disabled_capabilities;
size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
@@ -274,9 +239,12 @@ struct sender_state {
uint16_t hops;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
+ struct line_splitter line;
struct compressor_state compressor;
-#endif // ENABLE_RRDPUSH_COMPRESSION
+
+#ifdef NETDATA_LOG_STREAM_SENDER
+ FILE *stream_log_fp;
+#endif
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl; // structure used to encrypt the connection
@@ -306,6 +274,8 @@ struct sender_state {
usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
} atomic;
+
+ int parent_using_h2o;
};
#define sender_lock(sender) spinlock_lock(&(sender)->spinlock)
@@ -362,19 +332,6 @@ typedef struct stream_node_instance {
} STREAM_NODE_INSTANCE;
*/
-struct buffered_reader {
- ssize_t read_len;
- ssize_t pos;
- char read_buffer[PLUGINSD_LINE_MAX + 1];
-};
-
-bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst);
-static inline void buffered_reader_init(struct buffered_reader *reader) {
- reader->read_buffer[0] = '\0';
- reader->read_len = 0;
- reader->pos = 0;
-}
-
struct receiver_state {
RRDHOST *host;
pid_t tid;
@@ -421,6 +378,7 @@ struct receiver_state {
time_t rrdpush_replication_step;
char *rrdpush_destination; // DONT FREE - it is allocated in appconfig
unsigned int rrdpush_compression;
+ STREAM_CAPABILITIES compression_priorities[COMPRESSION_ALGORITHM_MAX];
} config;
#ifdef ENABLE_HTTPS
@@ -429,17 +387,24 @@ struct receiver_state {
time_t replication_first_time_t;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
struct decompressor_state decompressor;
-#endif // ENABLE_RRDPUSH_COMPRESSION
/*
struct {
uint32_t count;
STREAM_NODE_INSTANCE *array;
} instances;
*/
+
+#ifdef ENABLE_H2O
+ void *h2o_ctx;
+#endif
};
+#ifdef ENABLE_H2O
+#define is_h2o_rrdpush(x) ((x)->h2o_ctx != NULL)
+#define unless_h2o_rrdpush(x) if(!is_h2o_rrdpush(x))
+#endif
+
struct rrdpush_destinations {
STRING *destination;
bool ssl;
@@ -453,9 +418,7 @@ struct rrdpush_destinations {
};
extern unsigned int default_rrdpush_enabled;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
extern unsigned int default_rrdpush_compression_enabled;
-#endif // ENABLE_RRDPUSH_COMPRESSION
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
@@ -498,11 +461,10 @@ void rrdpush_send_dyncfg(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
-int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx);
void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
-void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
int connect_to_one_of_destinations(
RRDHOST *host,
int default_port,
@@ -514,18 +476,15 @@ int connect_to_one_of_destinations(
void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-struct compressor_state *create_compressor();
-#endif // ENABLE_RRDPUSH_COMPRESSION
-
void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
-void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
+void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority);
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender);
int32_t stream_capabilities_to_vn(uint32_t caps);
+void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps);
void receiver_state_free(struct receiver_state *rpt);
bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason);
@@ -781,6 +740,13 @@ void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char
void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name);
void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type);
-void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x
+void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);
+void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name);
+
+bool rrdpush_compression_initialize(struct sender_state *s);
+bool rrdpush_decompression_initialize(struct receiver_state *rpt);
+void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order);
+void rrdpush_select_receiver_compression_algorithm(struct receiver_state *rpt);
+void rrdpush_compression_deactivate(struct sender_state *s);
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index 71f875034..09b67e968 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1,31 +1,37 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
-
-#define WORKER_SENDER_JOB_CONNECT 0
-#define WORKER_SENDER_JOB_PIPE_READ 1
-#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
-#define WORKER_SENDER_JOB_EXECUTE 3
-#define WORKER_SENDER_JOB_SOCKET_SEND 4
-#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
-#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
-#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
-#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
-#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
-#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
-#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
-#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
-#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
-#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
-#define WORKER_SENDER_JOB_BUFFER_RATIO 15
-#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
-#define WORKER_SENDER_JOB_BYTES_SENT 17
-#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
-#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
-#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20
-
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
+#include "common.h"
+#include "aclk/https_client.h"
+
+#define WORKER_SENDER_JOB_CONNECT 0
+#define WORKER_SENDER_JOB_PIPE_READ 1
+#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
+#define WORKER_SENDER_JOB_EXECUTE 3
+#define WORKER_SENDER_JOB_SOCKET_SEND 4
+#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
+#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
+#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
+#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
+#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
+#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
+#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
+#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
+#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
+#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
+#define WORKER_SENDER_JOB_BUFFER_RATIO 15
+#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
+#define WORKER_SENDER_JOB_BYTES_SENT 17
+#define WORKER_SENDER_JOB_BYTES_COMPRESSED 18
+#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19
+#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20
+#define WORKER_SENDER_JOB_REPLAY_REQUEST 21
+#define WORKER_SENDER_JOB_FUNCTION_REQUEST 22
+#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23
+#define WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION 24
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 25
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 25
#endif
extern struct config stream_config;
@@ -66,21 +72,6 @@ BUFFER *sender_start(struct sender_state *s) {
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
-/*
-* In case of stream compression buffer overflow
-* Inform the user through the error log file and
-* deactivate compression by downgrading the stream protocol.
-*/
-static inline void deactivate_compression(struct sender_state *s) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
- netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it.");
- s->flags &= ~SENDER_FLAG_COMPRESSION;
- netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
- rrdpush_sender_thread_close_socket(s->host);
-}
-#endif
-
#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
// Collector thread finishing a transmission
@@ -102,13 +93,22 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
sender_lock(s);
-// FILE *fp = fopen("/tmp/stream.txt", "a");
-// fprintf(fp,
-// "\n--- SEND BEGIN: %s ----\n"
-// "%s"
-// "--- SEND END ----------------------------------------\n"
-// , rrdhost_hostname(s->host), src);
-// fclose(fp);
+#ifdef NETDATA_LOG_STREAM_SENDER
+ if(type == STREAM_TRAFFIC_TYPE_METADATA) {
+ if(!s->stream_log_fp) {
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "/tmp/stream-sender-%s.txt", s->host ? rrdhost_hostname(s->host) : "unknown");
+
+ s->stream_log_fp = fopen(filename, "w");
+ }
+
+ fprintf(s->stream_log_fp, "\n--- SEND MESSAGE START: %s ----\n"
+ "%s"
+ "--- SEND MESSAGE END ----------------------------------------\n"
+ , rrdhost_hostname(s->host), src
+ );
+ }
+#endif
if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
@@ -117,8 +117,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
}
-#ifdef ENABLE_RRDPUSH_COMPRESSION
- if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) {
+ if (s->compressor.initialized) {
while(src_len) {
size_t size_to_compress = src_len;
@@ -143,28 +142,45 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
}
}
- char *dst;
+ const char *dst;
size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if (!dst_len) {
netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
rrdhost_hostname(s->host), s->connected_to);
- rrdpush_compressor_reset(&s->compressor);
+ rrdpush_compression_initialize(s);
dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if(!dst_len) {
netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
rrdhost_hostname(s->host), s->connected_to);
- deactivate_compression(s);
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
+ rrdpush_compression_deactivate(s);
+ rrdpush_sender_thread_close_socket(s->host);
sender_unlock(s);
return;
}
}
- if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
+ rrdpush_signature_t signature = rrdpush_compress_encode_signature(dst_len);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ // check if reversing the signature provides the same length
+ size_t decoded_dst_len = rrdpush_decompress_decode_signature((const char *)&signature, sizeof(signature));
+ if(decoded_dst_len != dst_len)
+ fatal("RRDPUSH COMPRESSION: invalid signature, original payload %zu bytes, "
+ "compressed payload length %zu bytes, but signature says payload is %zu bytes",
+ size_to_compress, dst_len, decoded_dst_len);
+#endif
+
+ if(cbuffer_add_unsafe(s->buffer, (const char *)&signature, sizeof(signature)))
s->flags |= SENDER_FLAG_OVERFLOW;
- else
- s->sent_bytes_on_this_connection_per_type[type] += dst_len;
+ else {
+ if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
+ s->flags |= SENDER_FLAG_OVERFLOW;
+ else
+ s->sent_bytes_on_this_connection_per_type[type] += dst_len + sizeof(signature);
+ }
src = src + size_to_compress;
src_len -= size_to_compress;
@@ -174,12 +190,6 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
s->flags |= SENDER_FLAG_OVERFLOW;
else
s->sent_bytes_on_this_connection_per_type[type] += src_len;
-#else
- if(cbuffer_add_unsafe(s->buffer, src, src_len))
- s->flags |= SENDER_FLAG_OVERFLOW;
- else
- s->sent_bytes_on_this_connection_per_type[type] += src_len;
-#endif
replication_recalculate_buffer_used_ratio_unsafe(s);
@@ -191,7 +201,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
sender_unlock(s);
- if(signal_sender)
+ if(signal_sender && (!stream_has_capability(s, STREAM_CAP_INTERPOLATED) || type != STREAM_TRAFFIC_TYPE_DATA))
rrdpush_signal_sender_to_wake_up(s);
}
@@ -251,15 +261,17 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
RRDSET *st;
rrdset_foreach_read(st, host) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- st->upstream_resync_time_s = 0;
+ st->rrdpush.sender.resync_time_s = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- rrddim_clear_exposed(rd);
+ rrddim_metadata_exposed_upstream_clear(rd);
rrddim_foreach_done(rd);
+
+ rrdset_metadata_updated(st);
}
rrdset_foreach_done(st);
@@ -342,8 +354,7 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
rrdpush_sender_charts_and_replication_reset(host);
}
-void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
-{
+void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) {
se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz("");
se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz("");
se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz("");
@@ -351,128 +362,155 @@ void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz("");
}
-void rrdpush_clean_encoded(stream_encoded_t *se)
-{
- if (se->os_name)
+void rrdpush_clean_encoded(stream_encoded_t *se) {
+ if (se->os_name) {
freez(se->os_name);
+ se->os_name = NULL;
+ }
- if (se->os_id)
+ if (se->os_id) {
freez(se->os_id);
+ se->os_id = NULL;
+ }
- if (se->os_version)
+ if (se->os_version) {
freez(se->os_version);
+ se->os_version = NULL;
+ }
- if (se->kernel_name)
+ if (se->kernel_name) {
freez(se->kernel_name);
+ se->kernel_name = NULL;
+ }
- if (se->kernel_version)
+ if (se->kernel_version) {
freez(se->kernel_version);
+ se->kernel_version = NULL;
+ }
}
struct {
const char *response;
+ const char *status;
size_t length;
int32_t version;
bool dynamic;
const char *error;
int worker_job_id;
int postpone_reconnect_seconds;
- bool prevent_log;
+ ND_LOG_FIELD_PRIORITY priority;
} stream_responses[] = {
{
.response = START_STREAMING_PROMPT_VN,
.length = sizeof(START_STREAMING_PROMPT_VN) - 1,
+ .status = RRDPUSH_STATUS_CONNECTED,
.version = STREAM_HANDSHAKE_OK_V3, // and above
.dynamic = true, // dynamic = we will parse the version / capabilities
.error = NULL,
.worker_job_id = 0,
.postpone_reconnect_seconds = 0,
+ .priority = NDLP_INFO,
},
{
.response = START_STREAMING_PROMPT_V2,
.length = sizeof(START_STREAMING_PROMPT_V2) - 1,
+ .status = RRDPUSH_STATUS_CONNECTED,
.version = STREAM_HANDSHAKE_OK_V2,
.dynamic = false,
.error = NULL,
.worker_job_id = 0,
.postpone_reconnect_seconds = 0,
+ .priority = NDLP_INFO,
},
{
.response = START_STREAMING_PROMPT_V1,
.length = sizeof(START_STREAMING_PROMPT_V1) - 1,
+ .status = RRDPUSH_STATUS_CONNECTED,
.version = STREAM_HANDSHAKE_OK_V1,
.dynamic = false,
.error = NULL,
.worker_job_id = 0,
.postpone_reconnect_seconds = 0,
+ .priority = NDLP_INFO,
},
{
.response = START_STREAMING_ERROR_SAME_LOCALHOST,
.length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1,
+ .status = RRDPUSH_STATUS_LOCALHOST,
.version = STREAM_HANDSHAKE_ERROR_LOCALHOST,
.dynamic = false,
.error = "remote server rejected this stream, the host we are trying to stream is its localhost",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
- .prevent_log = true,
+ .priority = NDLP_DEBUG,
},
{
.response = START_STREAMING_ERROR_ALREADY_STREAMING,
.length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1,
+ .status = RRDPUSH_STATUS_ALREADY_CONNECTED,
.version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED,
.dynamic = false,
.error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minutes
- .prevent_log = true,
+ .priority = NDLP_DEBUG,
},
{
.response = START_STREAMING_ERROR_NOT_PERMITTED,
.length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1,
+ .status = RRDPUSH_STATUS_PERMISSION_DENIED,
.version = STREAM_HANDSHAKE_ERROR_DENIED,
.dynamic = false,
.error = "remote server denied access, probably we don't have the right API key?",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 1 * 60, // 1 minute
+ .priority = NDLP_ERR,
},
{
.response = START_STREAMING_ERROR_BUSY_TRY_LATER,
.length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1,
+ .status = RRDPUSH_STATUS_RATE_LIMIT,
.version = STREAM_HANDSHAKE_BUSY_TRY_LATER,
.dynamic = false,
.error = "remote server is currently busy, we should try later",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minutes
+ .priority = NDLP_NOTICE,
},
{
.response = START_STREAMING_ERROR_INTERNAL_ERROR,
.length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1,
+ .status = RRDPUSH_STATUS_INTERNAL_SERVER_ERROR,
.version = STREAM_HANDSHAKE_INTERNAL_ERROR,
.dynamic = false,
.error = "remote server is encountered an internal error, we should try later",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 5 * 60, // 5 minutes
+ .priority = NDLP_CRIT,
},
{
.response = START_STREAMING_ERROR_INITIALIZATION,
.length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1,
+ .status = RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS,
.version = STREAM_HANDSHAKE_INITIALIZATION,
.dynamic = false,
.error = "remote server is initializing, we should try later",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minute
+ .priority = NDLP_NOTICE,
},
// terminator
{
.response = NULL,
.length = 0,
+ .status = RRDPUSH_STATUS_BAD_HANDSHAKE,
.version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE,
.dynamic = false,
.error = "remote node response is not understood, is it Netdata?",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 1 * 60, // 1 minute
- .prevent_log = false,
+ .priority = NDLP_ERR,
}
};
@@ -502,8 +540,9 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
return true;
}
- bool prevent_log = stream_responses[i].prevent_log;
+ ND_LOG_FIELD_PRIORITY priority = stream_responses[i].priority;
const char *error = stream_responses[i].error;
+ const char *status = stream_responses[i].status;
int worker_job_id = stream_responses[i].worker_job_id;
int delay = stream_responses[i].postpone_reconnect_seconds;
@@ -512,19 +551,29 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
host->destination->reason = version;
host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
- char buf[LOG_DATE_LENGTH];
- log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- if(prevent_log)
- internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
- rrdhost_hostname(host), s->connected_to, error, delay, buf);
- else
- netdata_log_error("STREAM %s [send to %s]: %s - will retry in %d secs, at %s",
- rrdhost_hostname(host), s->connected_to, error, delay, buf);
+ char buf[RFC3339_MAX_LENGTH];
+ rfc3339_datetime_ut(buf, sizeof(buf), host->destination->postpone_reconnection_until * USEC_PER_SEC, 0, false);
+
+ nd_log(NDLS_DAEMON, priority,
+ "STREAM %s [send to %s]: %s - will retry in %d secs, at %s",
+ rrdhost_hostname(host), s->connected_to, error, delay, buf);
return false;
}
+unsigned char alpn_proto_list[] = {
+ 18, 'n', 'e', 't', 'd', 'a', 't', 'a', '_', 's', 't', 'r', 'e', 'a', 'm', '/', '2', '.', '0',
+ 8, 'h', 't', 't', 'p', '/', '1', '.', '1'
+};
+
+#define CONN_UPGRADE_VAL "upgrade"
+
static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
#ifdef ENABLE_HTTPS
RRDHOST *host = s->host;
@@ -535,10 +584,16 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
if(!ssl_required)
return true;
- if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) {
+ if (netdata_ssl_open_ext(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket, alpn_proto_list, sizeof(alpn_proto_list))) {
if(!netdata_ssl_connect(&host->sender->ssl)) {
// couldn't connect
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_SSL_ERROR),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
rrdpush_sender_thread_close_socket(host);
host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
@@ -550,6 +605,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
security_test_certificate(host->sender->ssl.conn)) {
// certificate is not valid
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
rrdpush_sender_thread_close_socket(host);
@@ -561,6 +622,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
return true;
}
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
netdata_log_error("SSL: failed to establish connection.");
return false;
@@ -570,6 +637,104 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
#endif
}
+static int rrdpush_http_upgrade_prelude(RRDHOST *host, struct sender_state *s) {
+
+ char http[HTTP_HEADER_SIZE + 1];
+ snprintfz(http, HTTP_HEADER_SIZE,
+ "GET " NETDATA_STREAM_URL HTTP_1_1 HTTP_ENDL
+ "Upgrade: " NETDATA_STREAM_PROTO_NAME HTTP_ENDL
+ "Connection: Upgrade"
+ HTTP_HDR_END);
+
+ ssize_t bytes = send_timeout(
+#ifdef ENABLE_HTTPS
+ &host->sender->ssl,
+#endif
+ s->rrdpush_sender_socket,
+ http,
+ strlen(http),
+ 0,
+ 1000);
+
+ bytes = recv_timeout(
+#ifdef ENABLE_HTTPS
+ &host->sender->ssl,
+#endif
+ s->rrdpush_sender_socket,
+ http,
+ HTTP_HEADER_SIZE,
+ 0,
+ 1000);
+
+ if (bytes <= 0) {
+ error_report("Error reading from remote");
+ return 1;
+ }
+
+ rbuf_t buf = rbuf_create(bytes);
+ rbuf_push(buf, http, bytes);
+
+ http_parse_ctx ctx;
+ http_parse_ctx_create(&ctx);
+ ctx.flags |= HTTP_PARSE_FLAG_DONT_WAIT_FOR_CONTENT;
+
+ int rc;
+// while((rc = parse_http_response(buf, &ctx)) == HTTP_PARSE_NEED_MORE_DATA);
+ rc = parse_http_response(buf, &ctx);
+
+ if (rc != HTTP_PARSE_SUCCESS) {
+ error_report("Failed to parse HTTP response sent. (%d)", rc);
+ goto err_cleanup;
+ }
+ if (ctx.http_code == HTTP_RESP_MOVED_PERM) {
+ const char *hdr = get_http_header_by_name(&ctx, "location");
+ if (hdr)
+ error_report("HTTP response is %d Moved Permanently (location: \"%s\") instead of expected %d Switching Protocols.", ctx.http_code, hdr, HTTP_RESP_SWITCH_PROTO);
+ else
+ error_report("HTTP response is %d instead of expected %d Switching Protocols.", ctx.http_code, HTTP_RESP_SWITCH_PROTO);
+ goto err_cleanup;
+ }
+ if (ctx.http_code == HTTP_RESP_NOT_FOUND) {
+ error_report("HTTP response is %d instead of expected %d Switching Protocols. Parent version too old.", ctx.http_code, HTTP_RESP_SWITCH_PROTO);
+ // TODO set some flag here that will signify parent is older version
+ // and to try connection without rrdpush_http_upgrade_prelude next time
+ goto err_cleanup;
+ }
+ if (ctx.http_code != HTTP_RESP_SWITCH_PROTO) {
+ error_report("HTTP response is %d instead of expected %d Switching Protocols", ctx.http_code, HTTP_RESP_SWITCH_PROTO);
+ goto err_cleanup;
+ }
+
+ const char *hdr = get_http_header_by_name(&ctx, "connection");
+ if (!hdr) {
+ error_report("Missing \"connection\" header in reply");
+ goto err_cleanup;
+ }
+ if (strncmp(hdr, CONN_UPGRADE_VAL, strlen(CONN_UPGRADE_VAL))) {
+ error_report("Expected \"connection: " CONN_UPGRADE_VAL "\"");
+ goto err_cleanup;
+ }
+
+ hdr = get_http_header_by_name(&ctx, "upgrade");
+ if (!hdr) {
+ error_report("Missing \"upgrade\" header in reply");
+ goto err_cleanup;
+ }
+ if (strncmp(hdr, NETDATA_STREAM_PROTO_NAME, strlen(NETDATA_STREAM_PROTO_NAME))) {
+ error_report("Expected \"upgrade: " NETDATA_STREAM_PROTO_NAME "\"");
+ goto err_cleanup;
+ }
+
+ netdata_log_debug(D_STREAM, "Stream sender upgrade to \"" NETDATA_STREAM_PROTO_NAME "\" successful");
+ rbuf_free(buf);
+ http_parse_ctx_destroy(&ctx);
+ return 0;
+err_cleanup:
+ rbuf_free(buf);
+ http_parse_ctx_destroy(&ctx);
+ return 1;
+}
+
static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
struct timeval tv = {
@@ -600,12 +765,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
// reset our capabilities to default
s->capabilities = stream_our_capabilities(host, true);
-#ifdef ENABLE_RRDPUSH_COMPRESSION
- // If we don't want compression, remove it from our capabilities
- if(!(s->flags & SENDER_FLAG_COMPRESSION))
- s->capabilities &= ~STREAM_CAP_COMPRESSION;
-#endif // ENABLE_RRDPUSH_COMPRESSION
-
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
*/
@@ -660,7 +819,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
"&NETDATA_SYSTEM_TOTAL_RAM=%s"
"&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s"
"&NETDATA_PROTOCOL_VERSION=%s"
- " HTTP/1.1\r\n"
+ HTTP_1_1 HTTP_ENDL
"User-Agent: %s/%s\r\n"
"Accept: */*\r\n\r\n"
, host->rrdpush_send_api_key
@@ -715,6 +874,20 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(!rrdpush_sender_connect_ssl(s))
return false;
+ if (s->parent_using_h2o && rrdpush_http_upgrade_prelude(host, s)) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->reason = STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
+ return false;
+ }
+
ssize_t bytes, len = (ssize_t)strlen(http);
bytes = send_timeout(
@@ -728,9 +901,19 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
timeout);
if(bytes <= 0) { // timeout is 0
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
- netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
+
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM %s [send to %s]: failed to send HTTP header to remote netdata.",
+ rrdhost_hostname(host), s->connected_to);
+
host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
return false;
@@ -747,41 +930,62 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
timeout);
if(bytes <= 0) { // timeout is 0
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
- netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
+
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM %s [send to %s]: remote netdata does not respond.",
+ rrdhost_hostname(host), s->connected_to);
+
host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
return false;
}
if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
- netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "STREAM %s [send to %s]: cannot set non-blocking mode for socket.",
+ rrdhost_hostname(host), s->connected_to);
if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
- netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "STREAM %s [send to %s]: cannot enlarge the socket buffer.",
+ rrdhost_hostname(host), s->connected_to);
http[bytes] = '\0';
- netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http);
if(!rrdpush_sender_validate_response(host, s, http, bytes))
return false;
-#ifdef ENABLE_RRDPUSH_COMPRESSION
- if(stream_has_capability(s, STREAM_CAP_COMPRESSION))
- rrdpush_compressor_reset(&s->compressor);
- else
- rrdpush_compressor_destroy(&s->compressor);
-#endif // ENABLE_RRDPUSH_COMPRESSION
+ rrdpush_compression_initialize(s);
log_sender_capabilities(s);
- netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CONNECTED),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "STREAM %s: connected to %s...",
+ rrdhost_hostname(host), s->connected_to);
return true;
}
-static bool attempt_to_connect(struct sender_state *state)
-{
+static bool attempt_to_connect(struct sender_state *state) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_to_parent_msgid),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
state->send_attempts = 0;
// reset the bytes we have sent for this session
@@ -950,6 +1154,12 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
void execute_commands(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &s->line),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
*end = 0;
while( start < end && (newline = strchr(start, '\n')) ) {
@@ -963,27 +1173,22 @@ void execute_commands(struct sender_state *s) {
continue;
}
- netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
- gettid(), s->connected_to, rrdhost_hostname(s->host), start);
-
- // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
-
- char *words[PLUGINSD_MAX_WORDS] = { NULL };
- size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS);
-
- const char *keyword = get_word(words, num_words, 0);
+ s->line.count++;
+ s->line.num_words = quoted_strings_splitter_pluginsd(start, s->line.words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(s->line.words, s->line.num_words, 0);
- if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
+ if(command && (strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
- char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1);
- char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2);
- char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3);
+ char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(s->line.words, s->line.num_words, 1);
+ char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(s->line.words, s->line.num_words, 2);
+ char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(s->line.words, s->line.num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
rrdhost_hostname(s->host), s->connected_to,
- keyword,
+ command,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
@@ -1021,9 +1226,12 @@ void execute_commands(struct sender_state *s) {
memset(&s->function_payload, 0, sizeof(struct function_payload_state));
}
}
- else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
+ else if (command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
+ nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
+
if (s->receiving_function_payload) {
- netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword);
+ netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload",
+ rrdhost_hostname(s->host), s->connected_to, command);
s->receiving_function_payload = false;
buffer_free(s->function_payload.payload);
s->function_payload.payload = NULL;
@@ -1031,14 +1239,14 @@ void execute_commands(struct sender_state *s) {
// TODO send error response
}
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
+ char *transaction = get_word(s->line.words, s->line.num_words, 1);
+ char *timeout_s = get_word(s->line.words, s->line.num_words, 2);
+ char *function = get_word(s->line.words, s->line.num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
rrdhost_hostname(s->host), s->connected_to,
- keyword,
+ command,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
@@ -1047,30 +1255,32 @@ void execute_commands(struct sender_state *s) {
s->receiving_function_payload = true;
s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions);
- s->function_payload.txid = strdupz(get_word(words, num_words, 1));
- s->function_payload.timeout = strdupz(get_word(words, num_words, 2));
- s->function_payload.fn_name = strdupz(get_word(words, num_words, 3));
+ s->function_payload.txid = strdupz(get_word(s->line.words, s->line.num_words, 1));
+ s->function_payload.timeout = strdupz(get_word(s->line.words, s->line.num_words, 2));
+ s->function_payload.fn_name = strdupz(get_word(s->line.words, s->line.num_words, 3));
}
- else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
+ else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL);
- char *transaction = get_word(words, num_words, 1);
+ char *transaction = get_word(s->line.words, s->line.num_words, 1);
if(transaction && *transaction)
rrd_function_cancel(transaction);
}
- else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
+ else if (command && strcmp(command, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL);
- const char *chart_id = get_word(words, num_words, 1);
- const char *start_streaming = get_word(words, num_words, 2);
- const char *after = get_word(words, num_words, 3);
- const char *before = get_word(words, num_words, 4);
+ const char *chart_id = get_word(s->line.words, s->line.num_words, 1);
+ const char *start_streaming = get_word(s->line.words, s->line.num_words, 2);
+ const char *after = get_word(s->line.words, s->line.num_words, 3);
+ const char *before = get_word(s->line.words, s->line.num_words, 4);
if (!chart_id || !start_streaming || !after || !before) {
netdata_log_error("STREAM %s [send to %s] %s command is incomplete"
" (chart=%s, start_streaming=%s, after=%s, before=%s)",
rrdhost_hostname(s->host), s->connected_to,
- keyword,
+ command,
chart_id ? chart_id : "(unset)",
start_streaming ? start_streaming : "(unset)",
after ? after : "(unset)",
@@ -1085,12 +1295,14 @@ void execute_commands(struct sender_state *s) {
}
}
else {
- netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
+ netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, s->line.words[0]?s->line.words[0]:"(unset)");
}
+ line_splitter_reset(&s->line);
worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
start = newline + 1;
}
+
if (start < end) {
memmove(s->read_buffer, start, end-start);
s->read_len = end - start;
@@ -1245,6 +1457,14 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
rrdhost_clear_sender___while_having_sender_mutex(host);
+
+#ifdef NETDATA_LOG_STREAM_SENDER
+ if(host->sender->stream_log_fp) {
+ fclose(host->sender->stream_log_fp);
+ host->sender->stream_log_fp = NULL;
+ }
+#endif
+
sender_unlock(host->sender);
freez(s->pipe_buffer);
@@ -1277,7 +1497,61 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) {
#endif
}
+static bool stream_sender_log_capabilities(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state)
+ return false;
+
+ stream_capabilities_to_string(wb, state->capabilities);
+ return true;
+}
+
+static bool stream_sender_log_transport(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state)
+ return false;
+
+#ifdef ENABLE_HTTPS
+ buffer_strcat(wb, SSL_connection(&state->ssl) ? "https" : "http");
+#else
+ buffer_strcat(wb, "http");
+#endif
+ return true;
+}
+
+static bool stream_sender_log_dst_ip(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state || state->rrdpush_sender_socket == -1)
+ return false;
+
+ SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket);
+ buffer_strcat(wb, peers.peer.ip);
+ return true;
+}
+
+static bool stream_sender_log_dst_port(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state || state->rrdpush_sender_socket == -1)
+ return false;
+
+ SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket);
+ buffer_print_uint64(wb, peers.peer.port);
+ return true;
+}
+
void *rrdpush_sender_thread(void *ptr) {
+ struct sender_state *s = ptr;
+
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_STR(NDF_NIDL_NODE, s->host->hostname),
+ ND_LOG_FIELD_CB(NDF_DST_IP, stream_sender_log_dst_ip, s),
+ ND_LOG_FIELD_CB(NDF_DST_PORT, stream_sender_log_dst_port, s),
+ ND_LOG_FIELD_CB(NDF_DST_TRANSPORT, stream_sender_log_transport, s),
+ ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_sender_log_capabilities, s),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_register("STREAMSND");
worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read");
@@ -1296,6 +1570,7 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION, "disconnect cant upgrade");
worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function");
@@ -1303,10 +1578,11 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, "bytes compressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, "bytes uncompressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, "cumulative compression savings ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
- struct sender_state *s = ptr;
-
if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
!*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
!*s->host->rrdpush_send_api_key) {
@@ -1342,6 +1618,9 @@ void *rrdpush_sender_thread(void *ptr) {
"initial clock resync iterations",
remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
+ s->parent_using_h2o = appconfig_get_boolean(
+ &stream_config, CONFIG_SECTION_STREAM, "parent using h2o", false);
+
// initialize rrdpush globals
rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
@@ -1397,7 +1676,10 @@ void *rrdpush_sender_thread(void *ptr) {
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
- netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "STREAM %s [send to %s]: enabling metrics streaming...",
+ rrdhost_hostname(s->host), s->connected_to);
continue;
}
@@ -1423,6 +1705,15 @@ void *rrdpush_sender_thread(void *ptr) {
rrdpush_sender_pipe_clear_pending_data(s);
rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
}
+
+ if(s->compressor.initialized) {
+ size_t bytes_uncompressed = s->compressor.sender_locked.total_uncompressed;
+ size_t bytes_compressed = s->compressor.sender_locked.total_compressed + s->compressor.sender_locked.total_compressions * sizeof(rrdpush_signature_t);
+ NETDATA_DOUBLE ratio = 100.0 - ((NETDATA_DOUBLE)bytes_compressed * 100.0 / (NETDATA_DOUBLE)bytes_uncompressed);
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_uncompressed);
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, (NETDATA_DOUBLE)bytes_compressed);
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, ratio);
+ }
sender_unlock(s);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);
@@ -1459,7 +1750,7 @@ void *rrdpush_sender_thread(void *ptr) {
}
};
- int poll_rc = poll(fds, 2, 1000);
+ int poll_rc = poll(fds, 2, 50); // timeout in milliseconds
netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
fds[Collector].revents, fds[Socket].revents, outstanding);
diff --git a/streaming/stream.conf b/streaming/stream.conf
index 94e94cab7..36213af02 100644
--- a/streaming/stream.conf
+++ b/streaming/stream.conf
@@ -170,6 +170,9 @@
# You can control stream compression in this parent agent stream with options: yes | no
#enable compression = yes
+ # select the order the compression algorithms will be used, when multiple are offered by the child
+ #compression algorithms order = zstd lz4 brotli gzip
+
# Replication
# Enable replication for all hosts using this api key. Default: enabled
#enable replication = yes
@@ -180,6 +183,11 @@
# The duration we want to replicate per each step.
#replication_step = 600
+ # Indicate whether this child is an ephemeral node. An ephemeral node will become unavailable
+ # after the specified duration of "cleanup ephemeral hosts after secs" (as defined in the db section of netdata.conf)
+ # from the time of the node's last connection.
+ #is ephemeral node = false
+
# -----------------------------------------------------------------------------
# 3. PER SENDING HOST SETTINGS, ON PARENT NETDATA
# THIS IS OPTIONAL - YOU DON'T HAVE TO CONFIGURE IT
@@ -250,3 +258,8 @@
# The duration we want to replicate per each step.
#replication_step = 600
+
+ # Indicate whether this child is an ephemeral node. An ephemeral node will become unavailable
+ # after the specified duration of "cleanup ephemeral hosts after secs" (as defined in the db section of netdata.conf)
+ # from the time of the node's last connection.
+ #is ephemeral node = false