summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2022-11-30 18:47:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2022-11-30 18:47:00 +0000
commit03bf87dcb06f7021bfb2df2fa8691593c6148aff (patch)
treee16b06711a2ed77cafb4b7754be0220c3d14a9d7 /streaming
parentAdding upstream version 1.36.1. (diff)
downloadnetdata-03bf87dcb06f7021bfb2df2fa8691593c6148aff.tar.xz
netdata-03bf87dcb06f7021bfb2df2fa8691593c6148aff.zip
Adding upstream version 1.37.0.upstream/1.37.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/README.md3
-rw-r--r--streaming/compression.c301
-rw-r--r--streaming/receiver.c708
-rw-r--r--streaming/replication.c1407
-rw-r--r--streaming/replication.h33
-rw-r--r--streaming/rrdpush.c613
-rw-r--r--streaming/rrdpush.h232
-rw-r--r--streaming/sender.c1255
-rw-r--r--streaming/stream.conf78
9 files changed, 3410 insertions, 1220 deletions
diff --git a/streaming/README.md b/streaming/README.md
index 57c392f40..58eb2cc1b 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -234,8 +234,7 @@ For Netdata v1.9+, streaming can also be monitored via `access.log`.
Netdata does not activate TLS encryption by default. To encrypt streaming connections:
1. On the parent node (receiving node), [enable TLS support](/web/server/README.md#enabling-tls-support).
-2. On the child node (sending node), [enable TLS support](/web/server/README.md#enabling-tls-support).
-3. On the child's `stream.conf`, configure the destination as follows:
+2. On the child's `stream.conf`, configure the destination as follows:
```
[stream]
diff --git a/streaming/compression.c b/streaming/compression.c
index d6178d6c3..7ba9dbf19 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -5,9 +5,7 @@
#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
-#define LZ4_MAX_MSG_SIZE 0x4000
-#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE)
-
+// signature MUST end with a newline
#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
#define SIGNATURE_SIZE 4
@@ -18,8 +16,9 @@
*/
struct compressor_data {
LZ4_stream_t *stream;
- char *stream_buffer;
- size_t stream_buffer_pos;
+ char *input_ring_buffer;
+ size_t input_ring_buffer_size;
+ size_t input_ring_buffer_pos;
};
@@ -31,9 +30,9 @@ static void lz4_compressor_reset(struct compressor_state *state)
if (state->data) {
if (state->data->stream) {
LZ4_resetStream_fast(state->data->stream);
- info("%s: Compressor Reset", STREAM_COMPRESSION_MSG);
+ internal_error(true, "%s: compressor reset", STREAM_COMPRESSION_MSG);
}
- state->data->stream_buffer_pos = 0;
+ state->data->input_ring_buffer_pos = 0;
}
}
@@ -47,10 +46,10 @@ static void lz4_compressor_destroy(struct compressor_state **state)
if (s->data) {
if (s->data->stream)
LZ4_freeStream(s->data->stream);
- freez(s->data->stream_buffer);
+ freez(s->data->input_ring_buffer);
freez(s->data);
}
- freez(s->buffer);
+ freez(s->compression_result_buffer);
freez(s);
*state = NULL;
debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG);
@@ -65,37 +64,53 @@ static void lz4_compressor_destroy(struct compressor_state **state)
*/
static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out)
{
- if (!state || !size || !out)
+ if(unlikely(!state || !size || !out))
return 0;
- if (size > LZ4_MAX_MSG_SIZE) {
- error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, size, LZ4_MAX_MSG_SIZE);
+
+ if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) {
+ error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE);
return 0;
}
+
size_t max_dst_size = LZ4_COMPRESSBOUND(size);
size_t data_size = max_dst_size + SIGNATURE_SIZE;
- if (!state->buffer) {
- state->buffer = mallocz(data_size);
- state->buffer_size = data_size;
- } else if (state->buffer_size < data_size) {
- state->buffer = reallocz(state->buffer, data_size);
- state->buffer_size = data_size;
+ if (!state->compression_result_buffer) {
+ state->compression_result_buffer = mallocz(data_size);
+ state->compression_result_buffer_size = data_size;
+ }
+ else if(unlikely(state->compression_result_buffer_size < data_size)) {
+ state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size);
+ state->compression_result_buffer_size = data_size;
}
- memcpy(state->data->stream_buffer + state->data->stream_buffer_pos, data, size);
- long int compressed_data_size = LZ4_compress_fast_continue(state->data->stream,
- state->data->stream_buffer + state->data->stream_buffer_pos,
- state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1);
+ // the ring buffer always has space for LZ4_MAX_MSG_SIZE
+ memcpy(state->data->input_ring_buffer + state->data->input_ring_buffer_pos, data, size);
+
+ // this call needs the last 64K of our previous data
+ // they are available in the ring buffer
+ long int compressed_data_size = LZ4_compress_fast_continue(
+ state->data->stream,
+ state->data->input_ring_buffer + state->data->input_ring_buffer_pos,
+ state->compression_result_buffer + SIGNATURE_SIZE,
+ size,
+ max_dst_size,
+ 1);
+
if (compressed_data_size < 0) {
error("Data compression error: %ld", compressed_data_size);
return 0;
}
- state->data->stream_buffer_pos += size;
- if (state->data->stream_buffer_pos >= LZ4_STREAM_BUFFER_SIZE - LZ4_MAX_MSG_SIZE)
- state->data->stream_buffer_pos = 0;
+
+ // update the next writing position of the ring buffer
+ state->data->input_ring_buffer_pos += size;
+ if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE))
+ state->data->input_ring_buffer_pos = 0;
+
+ // update the signature header
uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
- *(uint32_t *)state->buffer = len | SIGNATURE;
- *out = state->buffer;
+ *(uint32_t *)state->compression_result_buffer = len | SIGNATURE;
+ *out = state->compression_result_buffer;
debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
return compressed_data_size + SIGNATURE_SIZE;
}
@@ -114,8 +129,9 @@ struct compressor_state *create_compressor()
state->data = callocz(1, sizeof(struct compressor_data));
state->data->stream = LZ4_createStream();
- state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE));
- state->buffer_size = LZ4_STREAM_BUFFER_SIZE;
+ state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2);
+ state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size);
+ state->compression_result_buffer_size = 0;
state->reset(state);
debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG);
return state;
@@ -124,11 +140,12 @@ struct compressor_state *create_compressor()
/*
* LZ4 streaming API decompressor specific data
*/
-struct decompressor_data {
- LZ4_streamDecode_t *stream;
- char *stream_buffer;
- size_t stream_buffer_size;
- size_t stream_buffer_pos;
+struct decompressor_stream {
+ LZ4_streamDecode_t *lz4_stream;
+ char *buffer;
+ size_t size;
+ size_t write_at;
+ size_t read_at;
};
/*
@@ -136,12 +153,12 @@ struct decompressor_data {
*/
static void lz4_decompressor_reset(struct decompressor_state *state)
{
- if (state->data) {
- if (state->data->stream)
- LZ4_setStreamDecode(state->data->stream, NULL, 0);
- state->data->stream_buffer_pos = 0;
- state->buffer_len = 0;
- state->out_buffer_len = 0;
+ if (state->stream) {
+ if (state->stream->lz4_stream)
+ LZ4_setStreamDecode(state->stream->lz4_stream, NULL, 0);
+
+ state->stream->write_at = 0;
+ state->stream->read_at = 0;
}
}
@@ -152,173 +169,129 @@ static void lz4_decompressor_destroy(struct decompressor_state **state)
{
if (state && *state) {
struct decompressor_state *s = *state;
- if (s->data) {
+ if (s->stream) {
debug(D_STREAM, "%s: Destroying decompressor.", STREAM_COMPRESSION_MSG);
- if (s->data->stream)
- LZ4_freeStreamDecode(s->data->stream);
- freez(s->data->stream_buffer);
- freez(s->data);
+ if (s->stream->lz4_stream)
+ LZ4_freeStreamDecode(s->stream->lz4_stream);
+ freez(s->stream->buffer);
+ freez(s->stream);
}
- freez(s->buffer);
freez(s);
*state = NULL;
}
}
-static size_t decode_compress_header(const char *data, size_t data_size)
-{
- if (!data || !data_size)
+static size_t decode_compress_header(const char *data, size_t data_size) {
+ if (unlikely(!data || !data_size))
return 0;
- if (data_size < SIGNATURE_SIZE)
+
+ if (unlikely(data_size != SIGNATURE_SIZE))
return 0;
+
uint32_t sign = *(uint32_t *)data;
- if ((sign & SIGNATURE_MASK) != SIGNATURE)
+ if (unlikely((sign & SIGNATURE_MASK) != SIGNATURE))
return 0;
+
size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
return length;
}
/*
- * Check input data for the compression header
- * Return the size of compressed data or 0 for uncompressed data
- */
-size_t is_compressed_data(const char *data, size_t data_size)
-{
- return decode_compress_header(data, data_size);
-}
-
-/*
* Start the collection of compressed data in an internal buffer
* Return the size of compressed data or 0 for uncompressed data
*/
-static size_t lz4_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size)
-{
- size_t length = decode_compress_header(header, header_size);
- if (!length)
- return 0;
+static size_t lz4_decompressor_start(struct decompressor_state *state __maybe_unused, const char *header, size_t header_size) {
+ if(unlikely(state->stream->read_at != state->stream->write_at))
+ fatal("%s: asked to decompress new data, while there are unread data in the decompression buffer!"
+ , STREAM_COMPRESSION_MSG);
- if (!state->buffer) {
- state->buffer = mallocz(length);
- state->buffer_size = length;
- } else if (state->buffer_size < length) {
- state->buffer = reallocz(state->buffer, length);
- state->buffer_size = length;
- }
- state->buffer_len = length;
- state->buffer_pos = 0;
- state->out_buffer_pos = 0;
- state->out_buffer_len = 0;
- return length;
+ return decode_compress_header(header, header_size);
}
/*
- * Add a chunk of compressed data to the internal buffer
- * Return the current size of compressed data or 0 for error
+ * Decompress the compressed data in the internal buffer
+ * Return the size of uncompressed data or 0 for error
*/
-static size_t lz4_decompressor_put(struct decompressor_state *state, const char *data, size_t size)
-{
- if (!state || !size || !data)
+static size_t lz4_decompressor_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
+ if (unlikely(!state || !compressed_data || !compressed_size))
return 0;
- if (!state->buffer)
- fatal("STREAM: No decompressor buffer allocated");
- if (state->buffer_pos + size > state->buffer_len) {
- error("STREAM: Decompressor buffer overflow %lu + %lu > %lu",
- state->buffer_pos, size, state->buffer_len);
- size = state->buffer_len - state->buffer_pos;
+ if(unlikely(state->stream->read_at != state->stream->write_at))
+ fatal("%s: asked to decompress new data, while there are unread data in the decompression buffer!"
+ , STREAM_COMPRESSION_MSG);
+
+ if (unlikely(state->stream->write_at >= state->stream->size / 2)) {
+ state->stream->write_at = 0;
+ state->stream->read_at = 0;
}
- memcpy(state->buffer + state->buffer_pos, data, size);
- state->buffer_pos += size;
- return state->buffer_pos;
-}
-static size_t saving_percent(size_t comp_len, size_t src_len)
-{
- if (comp_len > src_len)
- comp_len = src_len;
- if (!src_len)
- return 0;
- return 100 - comp_len * 100 / src_len;
-}
+ long int decompressed_size = LZ4_decompress_safe_continue(
+ state->stream->lz4_stream
+ , compressed_data
+ , state->stream->buffer + state->stream->write_at
+ , (int)compressed_size
+ , (int)(state->stream->size - state->stream->write_at)
+ );
-/*
- * Decompress the compressed data in the internal buffer
- * Return the size of uncompressed data or 0 for error
- */
-static size_t lz4_decompressor_decompress(struct decompressor_state *state)
-{
- if (!state)
- return 0;
- if (!state->buffer) {
- error("%s: No decompressor buffer allocated", STREAM_COMPRESSION_MSG);
- return 0;
- }
-
- long int decompressed_size = LZ4_decompress_safe_continue(state->data->stream, state->buffer,
- state->data->stream_buffer + state->data->stream_buffer_pos,
- state->buffer_len, state->data->stream_buffer_size - state->data->stream_buffer_pos);
- if (decompressed_size < 0) {
- error("%s: Decompressor error %ld", STREAM_COMPRESSION_MSG, decompressed_size);
+ if (unlikely(decompressed_size < 0)) {
+ error("%s: decompressor returned negative decompressed bytes: %ld", STREAM_COMPRESSION_MSG, decompressed_size);
return 0;
}
- state->out_buffer = state->data->stream_buffer + state->data->stream_buffer_pos;
- state->data->stream_buffer_pos += decompressed_size;
- if (state->data->stream_buffer_pos >= state->data->stream_buffer_size - LZ4_MAX_MSG_SIZE)
- state->data->stream_buffer_pos = 0;
- state->out_buffer_len = decompressed_size;
- state->out_buffer_pos = 0;
+ if(unlikely(decompressed_size + state->stream->write_at > state->stream->size))
+ fatal("%s: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu"
+ , STREAM_COMPRESSION_MSG
+ , state->stream->size
+ , state->stream->write_at
+ , decompressed_size
+ , state->stream->write_at + decompressed_size - state->stream->size
+ );
- // Some compression statistics
- size_t old_avg_saving = saving_percent(state->total_compressed, state->total_uncompressed);
- size_t old_avg_size = state->packet_count ? state->total_uncompressed / state->packet_count : 0;
+ state->stream->write_at += decompressed_size;
- state->total_compressed += state->buffer_len + SIGNATURE_SIZE;
+ // statistics
+ state->total_compressed += compressed_size + SIGNATURE_SIZE;
state->total_uncompressed += decompressed_size;
state->packet_count++;
- size_t saving = saving_percent(state->buffer_len, decompressed_size);
- size_t avg_saving = saving_percent(state->total_compressed, state->total_uncompressed);
- size_t avg_size = state->total_uncompressed / state->packet_count;
-
- if (old_avg_saving != avg_saving || old_avg_size != avg_size){
- debug(D_STREAM, "%s: Saving: %lu%% (avg. %lu%%), avg.size: %lu", STREAM_COMPRESSION_MSG, saving, avg_saving, avg_size);
- }
return decompressed_size;
}
/*
* Return the size of uncompressed data left in the internal buffer or 0 for error
*/
-static size_t lz4_decompressor_decompressed_bytes_in_buffer(struct decompressor_state *state)
-{
- return state->out_buffer_len ?
- state->out_buffer_len - state->out_buffer_pos : 0;
+static size_t lz4_decompressor_decompressed_bytes_in_buffer(struct decompressor_state *state) {
+ if(unlikely(state->stream->read_at > state->stream->write_at))
+ fatal("%s: invalid read/write stream positions"
+ , STREAM_COMPRESSION_MSG);
+
+ return state->stream->write_at - state->stream->read_at;
}
/*
* Fill the buffer provided with uncompressed data from the internal buffer
* Return the size of uncompressed data copied or 0 for error
*/
-static size_t lz4_decompressor_get(struct decompressor_state *state, char *data, size_t size)
-{
- if (!state || !size || !data)
+static size_t lz4_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
+ if (unlikely(!state || !size || !dst))
return 0;
- if (!state->out_buffer)
- fatal("%s: No decompressor output buffer allocated", STREAM_COMPRESSION_MSG);
- if (state->out_buffer_pos + size > state->out_buffer_len)
- size = state->out_buffer_len - state->out_buffer_pos;
-
- char *p = state->out_buffer + state->out_buffer_pos, *endp = p + size, *last_lf = NULL;
- for (; p < endp; ++p)
- if (*p == '\n' || *p == 0)
- last_lf = p;
- if (last_lf)
- size = last_lf + 1 - (state->out_buffer + state->out_buffer_pos);
-
- memcpy(data, state->out_buffer + state->out_buffer_pos, size);
- state->out_buffer_pos += size;
- return size;
+
+ size_t remaining = lz4_decompressor_decompressed_bytes_in_buffer(state);
+ if(unlikely(!remaining))
+ return 0;
+
+ size_t bytes_to_return = size;
+ if(bytes_to_return > remaining)
+ bytes_to_return = remaining;
+
+ memcpy(dst, state->stream->buffer + state->stream->read_at, bytes_to_return);
+ state->stream->read_at += bytes_to_return;
+
+ if(unlikely(state->stream->read_at > state->stream->write_at))
+ fatal("%s: invalid read/write stream positions"
+ , STREAM_COMPRESSION_MSG);
+
+ return bytes_to_return;
}
/*
@@ -328,20 +301,20 @@ static size_t lz4_decompressor_get(struct decompressor_state *state, char *data,
struct decompressor_state *create_decompressor()
{
struct decompressor_state *state = callocz(1, sizeof(struct decompressor_state));
+ state->signature_size = SIGNATURE_SIZE;
state->reset = lz4_decompressor_reset;
state->start = lz4_decompressor_start;
- state->put = lz4_decompressor_put;
state->decompress = lz4_decompressor_decompress;
state->get = lz4_decompressor_get;
state->decompressed_bytes_in_buffer = lz4_decompressor_decompressed_bytes_in_buffer;
state->destroy = lz4_decompressor_destroy;
- state->data = callocz(1, sizeof(struct decompressor_data));
- fatal_assert(state->data);
- state->data->stream = LZ4_createStreamDecode();
- state->data->stream_buffer_size = LZ4_decoderRingBufferSize(LZ4_MAX_MSG_SIZE);
- state->data->stream_buffer = mallocz(state->data->stream_buffer_size);
- fatal_assert(state->data->stream_buffer);
+ state->stream = callocz(1, sizeof(struct decompressor_stream));
+ fatal_assert(state->stream);
+ state->stream->lz4_stream = LZ4_createStreamDecode();
+ state->stream->size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2;
+ state->stream->buffer = mallocz(state->stream->size);
+ fatal_assert(state->stream->buffer);
state->reset(state);
debug(D_STREAM, "%s: Initialize streaming decompression!", STREAM_COMPRESSION_MSG);
return state;
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 0890ebbcd..61ee33bc4 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -1,6 +1,18 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
+#include "parser/parser.h"
+
+// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
+#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
+#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2)
+
+// this has to be the same at parser.h
+#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
+
+#if WORKER_PARSER_FIRST_JOB < 1
+#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
+#endif
extern struct config stream_config;
@@ -58,105 +70,43 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
#include "collectors/plugins.d/pluginsd_parser.h"
-PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user)
{
- UNUSED(plugins_action);
- char *remote_time_txt = words[1];
- time_t remote_time = 0;
- RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
- struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd;
- if (cd->version < VERSION_GAP_FILLING ) {
- error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", host->hostname, cd->cmd,
- cd->version);
- return PARSER_RC_OK; // Ignore error and continue stream
- }
- if (remote_time_txt && *remote_time_txt) {
- remote_time = str2ull(remote_time_txt);
- time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host);
- time_t gap = 0;
- if (prev == 0)
- info(
- "STREAM %s from %s: Initial connection (no gap to check), "
- "remote=%"PRId64" local=%"PRId64" slew=%"PRId64"",
- host->hostname,
- cd->cmd,
- (int64_t)remote_time,
- (int64_t)now,
- (int64_t)now - remote_time);
- else {
- gap = now - prev;
- info(
- "STREAM %s from %s: Checking for gaps... "
- "remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap",
- host->hostname,
- cd->cmd,
- (int64_t)remote_time,
- (int64_t)prev,
- (int64_t)now,
- (int64_t)(remote_time - now),
- (int64_t)gap);
- }
- char message[128];
- sprintf(
- message,
- "REPLICATE %"PRId64" %"PRId64"\n",
- (int64_t)(remote_time - gap),
- (int64_t)remote_time);
- int ret;
-#ifdef ENABLE_HTTPS
- SSL *conn = host->stream_ssl.conn ;
- if(conn && !host->stream_ssl.flags) {
- ret = SSL_write(conn, message, strlen(message));
- } else {
- ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
- }
-#else
- ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
-#endif
- if (ret != (int)strlen(message))
- error("Failed to send initial timestamp - gaps may appear in charts");
- return PARSER_RC_OK;
- }
- return PARSER_RC_ERROR;
-}
+ const char *host_uuid_str = get_word(words, num_words, 1);
+ const char *claim_id_str = get_word(words, num_words, 2);
-#define CLAIMED_ID_MIN_WORDS 3
-PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action)
-{
- UNUSED(plugins_action);
+ if (!host_uuid_str || !claim_id_str) {
+ error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
+ host_uuid_str ? host_uuid_str : "[unset]",
+ claim_id_str ? claim_id_str : "[unset]");
+ return PARSER_RC_ERROR;
+ }
- int i;
uuid_t uuid;
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
- for (i = 0; words[i]; i++) ;
- if (i != CLAIMED_ID_MIN_WORDS) {
- error("Command CLAIMED_ID came malformed %d parameters are expected but %d received", CLAIMED_ID_MIN_WORDS - 1, i - 1);
- return PARSER_RC_ERROR;
- }
-
// We don't need the parsed UUID
// just do it to check the format
- if(uuid_parse(words[1], uuid)) {
- error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[1]);
+ if(uuid_parse(host_uuid_str, uuid)) {
+ error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
return PARSER_RC_ERROR;
}
- if(uuid_parse(words[2], uuid) && strcmp(words[2], "NULL")) {
- error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", words[2]);
+ if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) {
+ error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
return PARSER_RC_ERROR;
}
- if(strcmp(words[1], host->machine_guid)) {
- error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", words[1], host->machine_guid);
+ if(strcmp(host_uuid_str, host->machine_guid)) {
+ error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
return PARSER_RC_OK; //the message is OK problem must be somewhere else
}
rrdhost_aclk_state_lock(host);
if (host->aclk_state.claimed_id)
freez(host->aclk_state.claimed_id);
- host->aclk_state.claimed_id = strcmp(words[2], "NULL") ? strdupz(words[2]) : NULL;
+ host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
- store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL);
+ metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL);
rrdhost_aclk_state_unlock(host);
@@ -165,197 +115,242 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin
return PARSER_RC_OK;
}
+static int read_stream(struct receiver_state *r, char* buffer, size_t size) {
+ if(unlikely(!size)) {
+ internal_error(true, "%s() asked to read zero bytes", __FUNCTION__);
+ return 0;
+ }
-#ifndef ENABLE_COMPRESSION
-/* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
- */
-static int receiver_read(struct receiver_state *r, FILE *fp) {
#ifdef ENABLE_HTTPS
- if (r->ssl.conn && !r->ssl.flags) {
- ERR_clear_error();
- int desired = sizeof(r->read_buffer) - r->read_len - 1;
- int ret = SSL_read(r->ssl.conn, r->read_buffer + r->read_len, desired);
- if (ret > 0 ) {
- r->read_len += ret;
- return 0;
- }
- // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket
- u_long err;
- char buf[256];
- while ((err = ERR_get_error()) != 0) {
- ERR_error_string_n(err, buf, sizeof(buf));
- error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf);
- }
- return 1;
+ if (r->ssl.conn && r->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ return (int)netdata_ssl_read(r->ssl.conn, buffer, size);
+#endif
+
+ ssize_t bytes_read = read(r->fd, buffer, size);
+ if(bytes_read == 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
+ error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
+ bytes_read = -3;
}
+ else if (bytes_read == 0) {
+ error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__);
+ bytes_read = -1;
+ }
+ else if (bytes_read < 0) {
+ error("STREAM: %s() failed to read from socket!", __FUNCTION__);
+ bytes_read = -2;
+ }
+
+// do {
+// bytes_read = (int) fread(buffer, 1, size, fp);
+// if (unlikely(bytes_read <= 0)) {
+// if(feof(fp)) {
+// internal_error(true, "%s(): fread() failed with EOF", __FUNCTION__);
+// bytes_read = -2;
+// }
+// else if(ferror(fp)) {
+// internal_error(true, "%s(): fread() failed with ERROR", __FUNCTION__);
+// bytes_read = -3;
+// }
+// else bytes_read = 0;
+// }
+// else
+// worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, bytes_read);
+// } while(bytes_read == 0);
+
+ return (int)bytes_read;
+}
+
+static bool receiver_read_uncompressed(struct receiver_state *r) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(r->read_buffer[r->read_len] != '\0')
+ fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
#endif
- if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp))
- return 1;
- r->read_len = strlen(r->read_buffer);
- return 0;
+
+ int bytes_read = read_stream(r, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1);
+ if(unlikely(bytes_read <= 0))
+ return false;
+
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read);
+
+ r->read_len += bytes_read;
+ r->read_buffer[r->read_len] = '\0';
+
+ return true;
}
-#else
-/*
- * The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing.
- * if SSL encryption is on, then use SSL API for reading stream data.
- * Use line oriented fgets() in buffer from receiver_state is provided.
- * In other cases use fread to read binary data from socket.
- * Return zero on success and the number of bytes were read using pointer in the last argument.
- */
-static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t size, int* ret) {
- if (!ret)
- return 1;
- *ret = 0;
-#ifdef ENABLE_HTTPS
- if (r->ssl.conn && !r->ssl.flags) {
- ERR_clear_error();
- if (buffer != r->read_buffer + r->read_len) {
- *ret = SSL_read(r->ssl.conn, buffer, size);
- if (*ret > 0 )
- return 0;
- } else {
- // we need to receive data with LF to parse compression header
- size_t ofs = 0;
- int res = 0;
- errno = 0;
- while (ofs < size) {
- do {
- res = SSL_read(r->ssl.conn, buffer + ofs, 1);
- // When either SSL_ERROR_SYSCALL (OpenSSL < 3.0) or SSL_ERROR_SSL(OpenSSL > 3.0) happens,
- // the connection was lost https://www.openssl.org/docs/man3.0/man3/SSL_get_error.html,
- // without the test we will have an infinite loop https://github.com/netdata/netdata/issues/13092
- int local_ssl_err = SSL_get_error(r->ssl.conn, res);
- if (local_ssl_err == SSL_ERROR_SYSCALL || local_ssl_err == SSL_ERROR_SSL) {
- error("The SSL connection has error SSL_ERROR_SYSCALL(%d) and system is registering errno = %d",
- local_ssl_err, errno);
- return 1;
- }
- } while (res == 0);
-
- if (res < 0)
- break;
- if (buffer[ofs] == '\n')
- break;
- ofs += res;
- }
- if (res > 0) {
- ofs += res;
- *ret = ofs;
- buffer[ofs] = 0;
- return 0;
+
+#ifdef ENABLE_COMPRESSION
+static bool receiver_read_compressed(struct receiver_state *r) {
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(r->read_buffer[r->read_len] != '\0')
+ fatal("%s: read_buffer does not start with zero #2", __FUNCTION__ );
+#endif
+
+ // first use any available uncompressed data
+ if (r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
+ size_t available = sizeof(r->read_buffer) - r->read_len - 1;
+ if (available) {
+ size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available);
+ if (!len) {
+ internal_error(true, "decompressor returned zero length #1");
+ return false;
}
+
+ r->read_len += (int)len;
+ r->read_buffer[r->read_len] = '\0';
}
- // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket
- u_long err;
- char buf[256];
- while ((err = ERR_get_error()) != 0) {
- ERR_error_string_n(err, buf, sizeof(buf));
- error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf);
- }
- return 1;
+ else
+ internal_error(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len);
+
+ return true;
}
-#endif
- if (buffer != r->read_buffer + r->read_len) {
- // read to external buffer
- *ret = fread(buffer, 1, size, fp);
- if (!*ret)
- return 1;
- } else {
- if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp))
- return 1;
- *ret = strlen(r->read_buffer);
+
+ // no decompressed data available
+ // read the compression signature of the next block
+
+ if(unlikely(r->read_len + r->decompressor->signature_size > sizeof(r->read_buffer) - 1)) {
+ internal_error(true, "The last incomplete line does not leave enough room for the next compression header! Already have %d bytes in read_buffer.", r->read_len);
+ return false;
}
- return 0;
-}
-/*
- * Get the next line of data for parsing.
- * Return data from the decompressor buffer if available.
- * Otherwise read next line from the socket and check for compression header.
- * Return the line was read If no compression header was found.
- * Otherwise read the entire block of compressed data, decompress it
- * and return it in receiver_state buffer.
- * Return zero on success.
- */
-static int receiver_read(struct receiver_state *r, FILE *fp) {
- // check any decompressed data present
- if (r->decompressor &&
- r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
- size_t available = sizeof(r->read_buffer) - r->read_len;
- if (available) {
- size_t len = r->decompressor->get(r->decompressor,
- r->read_buffer + r->read_len, available);
- if (!len)
- return 1;
- r->read_len += len;
- }
- return 0;
+ // read the compression signature from the stream
+ // we have to do a loop here, because read_stream() may return less than the data we need
+ int bytes_read = 0;
+ do {
+ int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor->signature_size - bytes_read);
+ if (unlikely(ret <= 0))
+ return false;
+
+ bytes_read += ret;
+ } while(unlikely(bytes_read < (int)r->decompressor->signature_size));
+
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
+
+ if(unlikely(bytes_read != (int)r->decompressor->signature_size))
+ fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor->signature_size);
+
+ size_t compressed_message_size = r->decompressor->start(r->decompressor, r->read_buffer + r->read_len, bytes_read);
+ if (unlikely(!compressed_message_size)) {
+ internal_error(true, "multiplexed uncompressed data in compressed stream!");
+ r->read_len += bytes_read;
+ r->read_buffer[r->read_len] = '\0';
+ return true;
}
- int ret = 0;
- if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret))
- return 1;
-
- if (!is_compressed_data(r->read_buffer, ret)) {
- r->read_len += ret;
- return 0;
+
+ if(unlikely(compressed_message_size > COMPRESSION_MAX_MSG_SIZE)) {
+ error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.",
+ compressed_message_size, (size_t)COMPRESSION_MAX_MSG_SIZE);
+ return false;
}
- if (unlikely(!r->decompressor))
- r->decompressor = create_decompressor();
-
- size_t bytes_to_read = r->decompressor->start(r->decompressor,
- r->read_buffer, ret);
+ // delete compression header from our read buffer
+ r->read_buffer[r->read_len] = '\0';
- // Read the entire block of compressed data because
- // we're unable to decompress incomplete block
- char compressed[bytes_to_read];
+ // Read the entire compressed block of compressed data
+ char compressed[compressed_message_size];
+ size_t compressed_bytes_read = 0;
do {
- if (read_stream(r, fp, compressed, bytes_to_read, &ret))
- return 1;
- // Send input data to decompressor
- if (ret)
- r->decompressor->put(r->decompressor, compressed, ret);
- bytes_to_read -= ret;
- } while (bytes_to_read > 0);
- // Decompress
- size_t bytes_to_parse = r->decompressor->decompress(r->decompressor);
- if (!bytes_to_parse)
- return 1;
- // Fill read buffer with decompressed data
- r->read_len = r->decompressor->get(r->decompressor,
- r->read_buffer, sizeof(r->read_buffer));
- return 0;
-}
+ size_t start = compressed_bytes_read;
+ size_t remaining = compressed_message_size - start;
-#endif
+ int last_read_bytes = read_stream(r, &compressed[start], remaining);
+ if (unlikely(last_read_bytes <= 0)) {
+ internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes);
+ return false;
+ }
+
+ compressed_bytes_read += last_read_bytes;
+
+ } while(unlikely(compressed_message_size > compressed_bytes_read));
+
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)compressed_bytes_read);
+
+ // decompress the compressed block
+ size_t bytes_to_parse = r->decompressor->decompress(r->decompressor, compressed, compressed_bytes_read);
+ if (!bytes_to_parse) {
+ internal_error(true, "no bytes to parse.");
+ return false;
+ }
+
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse);
+
+ // fill read buffer with decompressed data
+ size_t len = (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1);
+ if (!len) {
+ internal_error(true, "decompressor returned zero length #2");
+ return false;
+ }
+ r->read_len += (int)len;
+ r->read_buffer[r->read_len] = '\0';
+
+ return true;
+}
+#else // !ENABLE_COMPRESSION
+static bool receiver_read_compressed(struct receiver_state *r) {
+ return receiver_read_uncompressed(r);
+}
+#endif // ENABLE_COMPRESSION
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
*/
-static char *receiver_next_line(struct receiver_state *r, int *pos) {
- int start = *pos, scan = *pos;
- if (scan >= r->read_len) {
+static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) {
+ size_t start = *pos;
+
+ char *ss = &r->read_buffer[start];
+ char *se = &r->read_buffer[r->read_len];
+ char *ds = buffer;
+ char *de = &buffer[buffer_length - 2];
+
+ if(ss >= se) {
+ *ds = '\0';
+ *pos = 0;
r->read_len = 0;
+ r->read_buffer[r->read_len] = '\0';
return NULL;
}
- while (scan < r->read_len && r->read_buffer[scan] != '\n')
- scan++;
- if (scan < r->read_len && r->read_buffer[scan] == '\n') {
- *pos = scan+1;
- r->read_buffer[scan] = 0;
- return &r->read_buffer[start];
+
+ // copy all bytes to buffer
+ while(ss < se && ds < de && *ss != '\n')
+ *ds++ = *ss++;
+
+ // if we have a newline, return the buffer
+ if(ss < se && ds < de && *ss == '\n') {
+ // newline found in the r->read_buffer
+
+ *ds++ = *ss++; // copy the newline too
+ *ds = '\0';
+
+ *pos = ss - r->read_buffer;
+ return buffer;
}
+
+ // if the destination is full, oops!
+ if(ds == de) {
+ error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX);
+ *ds = '\0';
+ *pos = ss - r->read_buffer;
+ return buffer;
+ }
+
+ // no newline found in the r->read_buffer
+ // move everything to the beginning
memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start);
- r->read_len -= start;
+ r->read_len -= (int)start;
+ r->read_buffer[r->read_len] = '\0';
+ *ds = '\0';
+ *pos = 0;
return NULL;
}
static void streaming_parser_thread_cleanup(void *ptr) {
PARSER *parser = (PARSER *)ptr;
+ rrd_collector_finished();
parser_destroy(parser);
}
-size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) {
+static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) {
size_t result;
PARSER_USER_OBJECT user = {
@@ -366,49 +361,68 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
.trust_durations = 1
};
- PARSER *parser = parser_init(rpt->host, &user, fp, PARSER_INPUT_SPLIT);
+ PARSER *parser = parser_init(rpt->host, &user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
+
+ rrd_collector_started();
// this keeps the parser with its current value
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(streaming_parser_thread_cleanup, parser);
- parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp);
parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
- parser->plugins_action->begin_action = &pluginsd_begin_action;
- parser->plugins_action->flush_action = &pluginsd_flush_action;
- parser->plugins_action->end_action = &pluginsd_end_action;
- parser->plugins_action->disable_action = &pluginsd_disable_action;
- parser->plugins_action->variable_action = &pluginsd_variable_action;
- parser->plugins_action->dimension_action = &pluginsd_dimension_action;
- parser->plugins_action->label_action = &pluginsd_label_action;
- parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
- parser->plugins_action->chart_action = &pluginsd_chart_action;
- parser->plugins_action->set_action = &pluginsd_set_action;
- parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action;
- parser->plugins_action->clabel_action = &pluginsd_clabel_action;
-
user.parser = parser;
+ bool compressed_connection = false;
#ifdef ENABLE_COMPRESSION
- if (rpt->decompressor)
- rpt->decompressor->reset(rpt->decompressor);
+ if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
+ compressed_connection = true;
+
+ if (!rpt->decompressor)
+ rpt->decompressor = create_decompressor();
+ else
+ rpt->decompressor->reset(rpt->decompressor);
+ }
#endif
- do{
- if (receiver_read(rpt, fp))
+ rpt->read_buffer[0] = '\0';
+ rpt->read_len = 0;
+
+ size_t read_buffer_start = 0;
+ char buffer[PLUGINSD_LINE_MAX + 2] = "";
+ while(!netdata_exit) {
+ if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) {
+ bool have_new_data;
+ if(compressed_connection)
+ have_new_data = receiver_read_compressed(rpt);
+ else
+ have_new_data = receiver_read_uncompressed(rpt);
+
+ if(!have_new_data)
+ break;
+
+ rpt->last_msg_t = now_realtime_sec();
+ continue;
+ }
+
+ if(unlikely(netdata_exit)) {
+ internal_error(true, "exiting...");
+ goto done;
+ }
+ if(unlikely(rpt->shutdown)) {
+ internal_error(true, "parser shutdown...");
+ goto done;
+ }
+
+ if (unlikely(parser_action(parser, buffer))) {
+ internal_error(true, "parser_action() failed on keyword '%s'.", buffer);
break;
- int pos = 0;
- char *line;
- while ((line = receiver_next_line(rpt, &pos))) {
- if (unlikely(netdata_exit || rpt->shutdown || parser_action(parser, line)))
- goto done;
}
- rpt->last_msg_t = now_realtime_sec();
}
- while(!netdata_exit);
done:
+ internal_error(true, "Streaming receiver thread stopping...");
+
result = user.count;
// free parser with the pop function
@@ -417,6 +431,15 @@ done:
return result;
}
+static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) {
+ RRDSET *st;
+ rrdset_foreach_read(st, rpt->host) {
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ }
+ rrdset_foreach_done(st);
+ rrdhost_receiver_replicating_charts_zero(rpt->host);
+}
static int rrdpush_receive(struct receiver_state *rpt)
{
@@ -427,6 +450,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
char *rrdpush_destination = default_rrdpush_destination;
char *rrdpush_api_key = default_rrdpush_api_key;
char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
+ bool rrdpush_enable_replication = default_rrdpush_enable_replication;
+ time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate;
+ time_t rrdpush_replication_step = default_rrdpush_replication_step;
time_t alarms_delay = 60;
rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every);
@@ -439,13 +465,10 @@ static int rrdpush_receive(struct receiver_state *rpt)
mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode)));
mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode)));
-#ifndef ENABLE_DBENGINE
- if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE)) {
- close(rpt->fd);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "REJECTED -- DBENGINE MEMORY MODE NOT SUPPORTED");
- return 1;
+ if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) {
+ error("STREAM %s [receive from %s:%s]: dbengine is not enabled, falling back to default.", rpt->hostname, rpt->client_ip, rpt->client_port);
+ mode = default_rrd_memory_mode;
}
-#endif
health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled);
health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled);
@@ -465,6 +488,15 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+ rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication);
+ rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication);
+
+ rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate);
+ rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate);
+
+ rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step);
+ rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step);
+
#ifdef ENABLE_COMPRESSION
unsigned int rrdpush_compression = default_compression_enabled;
rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression);
@@ -480,14 +512,12 @@ static int rrdpush_receive(struct receiver_state *rpt)
char initial_response[HTTP_HEADER_SIZE + 1];
snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
#ifdef ENABLE_HTTPS
- rpt->host->stream_ssl.conn = rpt->ssl.conn;
- rpt->host->stream_ssl.flags = rpt->ssl.flags;
if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
#else
if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
#endif
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY");
- error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
close(rpt->fd);
return 0;
}
@@ -516,6 +546,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rrdpush_destination
, rrdpush_api_key
, rrdpush_send_charts_matching
+ , rrdpush_enable_replication
+ , rrdpush_seconds_to_replicate
+ , rrdpush_replication_step
, rpt->system_info
, 0
);
@@ -561,6 +594,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdpush_destination,
rrdpush_api_key,
rrdpush_send_charts_matching,
+ rrdpush_enable_replication,
+ rrdpush_seconds_to_replicate,
+ rrdpush_replication_step,
rpt->system_info);
rrd_unlock();
}
@@ -575,14 +611,14 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rpt->hostname
, rpt->client_ip
, rpt->client_port
- , rpt->host->hostname
+ , rrdhost_hostname(rpt->host)
, rpt->host->machine_guid
, rpt->host->rrd_update_every
, rpt->host->rrd_history_entries
, rrd_memory_mode_name(rpt->host->rrd_memory_mode)
, (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
, ssl ? " SSL," : ""
- , rpt->host->tags?rpt->host->tags:""
+ , rrdhost_tags(rpt->host)
);
#endif // NETDATA_INTERNAL_CHECKS
@@ -596,7 +632,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
.obsolete = 0,
.started_t = now_realtime_sec(),
.next = NULL,
- .version = 0,
+ .capabilities = 0,
};
// put the client IP and port into the buffers used by plugins.d
@@ -605,60 +641,50 @@ static int rrdpush_receive(struct receiver_state *rpt)
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
- info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
- char initial_response[HTTP_HEADER_SIZE];
- if (rpt->stream_version > 1) {
- if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){
#ifdef ENABLE_COMPRESSION
- if(!rpt->rrdpush_compression)
- rpt->stream_version = STREAM_VERSION_CLABELS;
-#else
- if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) {
- rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION;
- }
+ if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
+ if (!rpt->rrdpush_compression)
+ rpt->capabilities &= ~STREAM_CAP_COMPRESSION;
+ }
#endif
- }
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
- sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version);
- } else if (rpt->stream_version == 1) {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
+
+ info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ char initial_response[HTTP_HEADER_SIZE];
+ if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities);
+ }
+ else if (stream_has_capability(rpt, STREAM_CAP_VN)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities));
+ } else if (stream_has_capability(rpt, STREAM_CAP_V2)) {
+ log_receiver_capabilities(rpt);
sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
- } else {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
- sprintf(initial_response, "%s", START_STREAMING_PROMPT);
+ } else { // stream_has_capability(rpt, STREAM_CAP_V1)
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1);
}
debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
- #ifdef ENABLE_HTTPS
- rpt->host->stream_ssl.conn = rpt->ssl.conn;
- rpt->host->stream_ssl.flags = rpt->ssl.flags;
+#ifdef ENABLE_HTTPS
if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
#else
if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
#endif
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY");
- error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
close(rpt->fd);
return 0;
}
// remove the non-blocking flag from the socket
if(sock_delnonblock(rpt->fd) < 0)
- error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+ error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
struct timeval timeout;
- timeout.tv_sec = 120;
+ timeout.tv_sec = 600;
timeout.tv_usec = 0;
if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0))
- error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
-
- // convert the socket to a FILE *
- FILE *fp = fdopen(rpt->fd, "r");
- if(!fp) {
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - SOCKET ERROR");
- error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
- close(rpt->fd);
- return 0;
- }
+ error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
rrdhost_wrlock(rpt->host);
/* if(rpt->host->connected_senders > 0) {
@@ -671,34 +697,29 @@ static int rrdpush_receive(struct receiver_state *rpt)
*/
// rpt->host->connected_senders++;
- if(rpt->stream_version > 0) {
- rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
- rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
- }
- else {
- rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
- rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
- }
-
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay;
- info(
- "Postponing health checks for %" PRId64 " seconds, on host '%s', because it was just connected.",
- (int64_t)alarms_delay,
- rpt->host->hostname);
+ log_health(
+ "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
+ rrdhost_hostname(rpt->host),
+ (int64_t)alarms_delay);
}
}
rpt->host->senders_connect_time = now_realtime_sec();
rpt->host->senders_last_chart_command = 0;
rpt->host->trigger_chart_obsoletion_check = 1;
+
rrdhost_unlock(rpt->host);
// call the plugins.d processor to receive the metrics
- info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: receiving metrics...",
+ rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
- cd.version = rpt->stream_version;
+ log_stream_connection(rpt->client_ip, rpt->client_port,
+ rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED");
+
+ cd.capabilities = rpt->capabilities;
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
@@ -707,24 +728,42 @@ static int rrdpush_receive(struct receiver_state *rpt)
aclk_host_state_update(rpt->host, 1);
#endif
+ rrdhost_set_is_parent_label(++localhost->senders_count);
+
+ rrdpush_receiver_replication_reset(rpt);
rrdcontext_host_child_connected(rpt->host);
- size_t count = streaming_parser(rpt, &cd, fp);
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname,
+ size_t count = streaming_parser(rpt, &cd, rpt->fd,
+#ifdef ENABLE_HTTPS
+ (rpt->ssl.conn) ? &rpt->ssl : NULL
+#else
+ NULL
+#endif
+ );
+
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+
+ log_stream_connection(rpt->client_ip, rpt->client_port,
+ rpt->key, rpt->host->machine_guid, rpt->hostname,
"DISCONNECTED");
- error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
- rpt->client_port, count);
+
+ error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).",
+ rpt->hostname, rpt->client_ip, rpt->client_port, count);
rrdcontext_host_child_disconnected(rpt->host);
+ rrdpush_receiver_replication_reset(rpt);
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
- // new child connected
+ // a child disconnected
if (netdata_cloud_setting)
aclk_host_state_update(rpt->host, 0);
#endif
+ rrdhost_set_is_parent_label(--localhost->senders_count);
+
// During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
if (!netdata_exit && rpt->host) {
rrd_rdlock();
@@ -747,7 +786,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
// cleanup
- fclose(fp);
+ close(rpt->fd);
return (int)count;
}
@@ -758,6 +797,9 @@ void *rrdpush_receiver_thread(void *ptr) {
info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
worker_register("STREAMRCV");
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
rrdpush_receive(rpt);
worker_unregister();
diff --git a/streaming/replication.c b/streaming/replication.c
new file mode 100644
index 000000000..8fa501061
--- /dev/null
+++ b/streaming/replication.c
@@ -0,0 +1,1407 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "replication.h"
+#include "Judy.h"
+
+#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
+#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
+#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
+
+#define WORKER_JOB_FIND_NEXT 1
+#define WORKER_JOB_QUERYING 2
+#define WORKER_JOB_DELETE_ENTRY 3
+#define WORKER_JOB_FIND_CHART 4
+#define WORKER_JOB_CHECK_CONSISTENCY 5
+#define WORKER_JOB_BUFFER_COMMIT 6
+#define WORKER_JOB_CLEANUP 7
+
+// master thread worker jobs
+#define WORKER_JOB_STATISTICS 8
+#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 9
+#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 10
+#define WORKER_JOB_CUSTOM_METRIC_ADDED 11
+#define WORKER_JOB_CUSTOM_METRIC_DONE 12
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 13
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 14
+#define WORKER_JOB_CUSTOM_METRIC_WAITS 15
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
+
+#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
+#define SECONDS_TO_RESET_POINT_IN_TIME 10
+
+static struct replication_query_statistics replication_queries = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
+ .queries_started = 0,
+ .queries_finished = 0,
+ .points_read = 0,
+ .points_generated = 0,
+};
+
+struct replication_query_statistics replication_get_query_statistics(void) {
+ netdata_spinlock_lock(&replication_queries.spinlock);
+ struct replication_query_statistics ret = replication_queries;
+ netdata_spinlock_unlock(&replication_queries.spinlock);
+ return ret;
+}
+
+// ----------------------------------------------------------------------------
+// sending replication replies
+
+struct replication_dimension {
+ STORAGE_POINT sp;
+ struct storage_engine_query_handle handle;
+ bool enabled;
+
+ DICTIONARY *dict;
+ const DICTIONARY_ITEM *rda;
+ RRDDIM *rd;
+};
+
+static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) {
+ size_t dimensions = rrdset_number_of_dimensions(st);
+ size_t points_read = 0, points_generated = 0;
+
+ struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
+ struct replication_dimension data[dimensions];
+ memset(data, 0, sizeof(data));
+
+ if(enable_streaming && st->last_updated.tv_sec > before) {
+ internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)before,
+ (unsigned long long)st->last_updated.tv_sec
+ );
+ before = st->last_updated.tv_sec;
+ }
+
+ // prepare our array of dimensions
+ {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(unlikely(!rd || !rd_dfe.item || !rd->exposed))
+ continue;
+
+ if (unlikely(rd_dfe.counter >= dimensions)) {
+ internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ break;
+ }
+
+ struct replication_dimension *d = &data[rd_dfe.counter];
+
+ d->dict = rd_dfe.dict;
+ d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
+ d->rd = rd;
+
+ ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before);
+ d->enabled = true;
+ }
+ rrddim_foreach_done(rd);
+ }
+
+ time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before;
+ while(now <= before) {
+ time_t min_start_time = 0, min_end_time = 0;
+ for (size_t i = 0; i < dimensions ;i++) {
+ struct replication_dimension *d = &data[i];
+ if(unlikely(!d->enabled)) continue;
+
+ // fetch the first valid point for the dimension
+ int max_skip = 100;
+ while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) {
+ d->sp = ops->next_metric(&d->handle);
+ points_read++;
+ }
+
+ internal_error(max_skip <= 0,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now);
+
+ if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp)))
+ continue;
+
+ if(unlikely(!min_start_time)) {
+ min_start_time = d->sp.start_time;
+ min_end_time = d->sp.end_time;
+ }
+ else {
+ min_start_time = MIN(min_start_time, d->sp.start_time);
+ min_end_time = MIN(min_end_time, d->sp.end_time);
+ }
+ }
+
+ if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)min_start_time,
+ (unsigned long long)min_end_time,
+ (unsigned long long)wall_clock_time);
+ break;
+ }
+
+ if(unlikely(min_end_time < now)) {
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true,
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
+ break;
+ }
+
+ if(unlikely(min_end_time <= min_start_time))
+ min_start_time = min_end_time - st->update_every;
+
+ if(unlikely(!actual_after)) {
+ actual_after = min_end_time;
+ actual_before = min_end_time;
+ }
+ else
+ actual_before = min_end_time;
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n"
+ , (unsigned long long)min_start_time
+ , (unsigned long long)min_end_time
+ , (unsigned long long)wall_clock_time
+ );
+
+ // output the replay values for this time
+ for (size_t i = 0; i < dimensions ;i++) {
+ struct replication_dimension *d = &data[i];
+ if(unlikely(!d->enabled)) continue;
+
+ if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time))
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
+ rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
+
+ else
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
+ rrddim_id(d->rd));
+
+ points_generated++;
+ }
+
+ now = min_end_time + 1;
+ }
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ if(actual_after) {
+ char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
+ log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
+ log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
+ internal_error(true,
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
+ (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
+ }
+ else
+ internal_error(true,
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)after, (unsigned long long)before);
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
+
+ // release all the dictionary items acquired
+ // finalize the queries
+ size_t queries = 0;
+ for(size_t i = 0; i < dimensions ;i++) {
+ struct replication_dimension *d = &data[i];
+ if(unlikely(!d->enabled)) continue;
+
+ ops->finalize(&d->handle);
+
+ dictionary_acquired_item_release(d->dict, d->rda);
+
+ // update global statistics
+ queries++;
+ }
+
+ netdata_spinlock_lock(&replication_queries.spinlock);
+ replication_queries.queries_started += queries;
+ replication_queries.queries_finished += queries;
+ replication_queries.points_read += points_read;
+ replication_queries.points_generated += points_generated;
+ netdata_spinlock_unlock(&replication_queries.spinlock);
+
+ return before;
+}
+
+static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if(!rd->exposed) continue;
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
+ rrddim_id(rd),
+ (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
+ rd->last_collected_value,
+ rd->last_calculated_value,
+ rd->last_stored_value
+ );
+ }
+ rrddim_foreach_done(rd);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
+ (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
+ (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
+ );
+}
+
+bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) {
+ time_t query_after = after;
+ time_t query_before = before;
+ time_t now = now_realtime_sec();
+ time_t tolerance = 2; // sometimes from the time we get this value, to the time we check,
+ // a data collection has been made
+ // so, we give this tolerance to detect invalid timestamps
+
+ // find the first entry we have
+ time_t first_entry_local = rrdset_first_entry_t(st);
+ if(first_entry_local > now + tolerance) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)first_entry_local, (unsigned long long)now);
+ first_entry_local = now;
+ }
+
+ if (query_after < first_entry_local)
+ query_after = first_entry_local;
+
+ // find the latest entry we have
+ time_t last_entry_local = st->last_updated.tv_sec;
+ if(!last_entry_local) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ last_entry_local = rrdset_last_entry_t(st);
+ if(!last_entry_local) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ last_entry_local = now;
+ }
+ }
+
+ if(last_entry_local > now + tolerance) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long)last_entry_local, (unsigned long long)now);
+ last_entry_local = now;
+ }
+
+ if (query_before > last_entry_local)
+ query_before = last_entry_local;
+
+ // if the parent asked us to start streaming, then fill the rest with the data that we have
+ if (start_streaming)
+ query_before = last_entry_local;
+
+ if (query_after > query_before) {
+ time_t tmp = query_before;
+ query_before = query_after;
+ query_after = tmp;
+ }
+
+ bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false;
+
+ // we might want to optimize this by filling a temporary buffer
+ // and copying the result to the host's buffer in order to avoid
+ // holding the host's buffer lock for too long
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
+
+ if(after != 0 && before != 0)
+ before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now);
+ else {
+ after = 0;
+ before = 0;
+ enable_streaming = true;
+ }
+
+ // get again the world clock time
+ time_t world_clock_time = now_realtime_sec();
+ if(enable_streaming) {
+ if(now < world_clock_time) {
+ // we needed time to execute this request
+ // so, the parent will need to replicate more data
+ enable_streaming = false;
+ }
+ else
+ replicate_chart_collection_state(wb, st);
+ }
+
+ // end with first/last entries we have, and the first start time and
+ // last end time of the data we sent
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
+
+ // current chart update every
+ (int)st->update_every
+
+ // child first db time, child end db time
+ , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local
+
+ // start streaming boolean
+ , enable_streaming ? "true" : "false"
+
+ // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
+ , (unsigned long long)after, (unsigned long long)before
+
+ // child world clock time
+ , (unsigned long long)world_clock_time
+ );
+
+ worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
+ sender_commit(host->sender, wb);
+ worker_is_busy(WORKER_JOB_CLEANUP);
+
+ return enable_streaming;
+}
+
+// ----------------------------------------------------------------------------
+// sending replication requests
+
+struct replication_request_details {
+ struct {
+ send_command callback;
+ void *data;
+ } caller;
+
+ RRDHOST *host;
+ RRDSET *st;
+
+ struct {
+ time_t first_entry_t; // the first entry time the child has
+ time_t last_entry_t; // the last entry time the child has
+ time_t world_time_t; // the current time of the child
+ } child_db;
+
+ struct {
+ time_t first_entry_t; // the first entry time we have
+ time_t last_entry_t; // the last entry time we have
+ bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future, and we fixed
+ time_t now; // the current local world clock time
+ } local_db;
+
+ struct {
+ time_t from; // the starting time of the entire gap we have
+ time_t to; // the ending time of the entire gap we have
+ } gap;
+
+ struct {
+ time_t after; // the start time we requested previously from this child
+ time_t before; // the end time we requested previously from this child
+ } last_request;
+
+ struct {
+ time_t after; // the start time of this replication request - the child will add 1 second
+ time_t before; // the end time of this replication request
+ bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
+ } wanted;
+};
+
+static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) {
+ RRDSET *st = r->st;
+
+ if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
+ st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ st->replay.log_next_data_collection = true;
+
+ char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
+
+ if(r->wanted.after)
+ log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
+
+ if(r->wanted.before)
+ log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
+
+ internal_error(true,
+ "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
+ "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s"
+ , rrdhost_hostname(r->host), rrdset_id(r->st)
+ , r->wanted.after, wanted_after_buf
+ , r->wanted.before, wanted_before_buf
+ , r->wanted.start_streaming ? "YES" : "NO"
+ , msg
+ , r->last_request.after, r->last_request.before
+ , r->child_db.first_entry_t, r->child_db.last_entry_t
+ , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
+ , r->local_db.first_entry_t, r->local_db.last_entry_t
+ , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now
+ , r->gap.from, r->gap.to
+ , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
+ , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
+ );
+
+ st->replay.start_streaming = r->wanted.start_streaming;
+ st->replay.after = r->wanted.after;
+ st->replay.before = r->wanted.before;
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
+
+ char buffer[2048 + 1];
+ snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
+ rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
+ (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
+
+ int ret = r->caller.callback(buffer, r->caller.data);
+ if (ret < 0) {
+ error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
+ rrdhost_hostname(r->host), rrdset_id(r->st), ret);
+ return false;
+ }
+
+ return true;
+}
+
+bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
+ time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
+ time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
+{
+ struct replication_request_details r = {
+ .caller = {
+ .callback = callback,
+ .data = callback_data,
+ },
+
+ .host = host,
+ .st = st,
+
+ .child_db = {
+ .first_entry_t = first_entry_child,
+ .last_entry_t = last_entry_child,
+ .world_time_t = child_world_time,
+ },
+
+ .local_db = {
+ .first_entry_t = rrdset_first_entry_t(st),
+ .last_entry_t = rrdset_last_entry_t(st),
+ .last_entry_t_adjusted_to_now = false,
+ .now = now_realtime_sec(),
+ },
+
+ .last_request = {
+ .after = prev_first_entry_wanted,
+ .before = prev_last_entry_wanted,
+ },
+
+ .wanted = {
+ .after = 0,
+ .before = 0,
+ .start_streaming = true,
+ },
+ };
+
+ // check our local database retention
+ if(r.local_db.last_entry_t > r.local_db.now) {
+ r.local_db.last_entry_t = r.local_db.now;
+ r.local_db.last_entry_t_adjusted_to_now = true;
+ }
+
+ // let's find the GAP we have
+ if(!r.last_request.after || !r.last_request.before) {
+ // there is no previous request
+
+ if(r.local_db.last_entry_t)
+ // we have some data, let's continue from the last point we have
+ r.gap.from = r.local_db.last_entry_t;
+ else
+ // we don't have any data, the gap is the max timeframe we are allowed to replicate
+ r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate;
+
+ }
+ else {
+ // we had sent a request - let's continue at the point we left it
+ // for this we don't take into account the actual data in our db
+ // because the child may also have gaps, and we need to get over it
+ r.gap.from = r.last_request.before;
+ }
+
+ // we want all the data up to now
+ r.gap.to = r.local_db.now;
+
+ // The gap is now r.gap.from -> r.gap.to
+
+ if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
+ return send_replay_chart_cmd(&r, "empty replication request, replication is disabled");
+
+ if (unlikely(!r.child_db.last_entry_t))
+ return send_replay_chart_cmd(&r, "empty replication request, child has no stored data");
+
+ if (unlikely(!rrdset_number_of_dimensions(st)))
+ return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions");
+
+ if (r.child_db.first_entry_t <= 0)
+ return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid");
+
+ if (r.child_db.first_entry_t > r.child_db.last_entry_t)
+ return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)");
+
+ if (r.local_db.last_entry_t > r.child_db.last_entry_t)
+ return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one");
+
+ // let's find what the child can provide to fill that gap
+
+ if(r.child_db.first_entry_t > r.gap.from)
+ // the child does not have all the data - let's get what it has
+ r.wanted.after = r.child_db.first_entry_t;
+ else
+ // ok, the child can fill the entire gap we have
+ r.wanted.after = r.gap.from;
+
+ if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
+ // the duration is too big for one request - let's take the first step
+ r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
+ else
+ // wow, we can do it in one request
+ r.wanted.before = r.gap.to;
+
+ // don't ask from the child more than it has
+ if(r.wanted.before > r.child_db.last_entry_t)
+ r.wanted.before = r.child_db.last_entry_t;
+
+ if(r.wanted.after > r.wanted.before)
+ r.wanted.after = r.wanted.before;
+
+ // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
+ r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t);
+
+ // the wanted timeframe is now r.wanted.after -> r.wanted.before
+ // send it
+ return send_replay_chart_cmd(&r, "OK");
+}
+
+// ----------------------------------------------------------------------------
+// replication thread
+
+// replication request in sender DICTIONARY
+// used for de-duplicating the requests
+struct replication_request {
+ struct sender_state *sender; // the sender we should put the reply at
+ STRING *chart_id; // the chart of the request
+ time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
+ time_t before; // the end time of the query (maybe zero)
+ bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
+
+ usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
+ Word_t unique_id; // auto-increment, later requests have bigger
+ bool found; // used as a result boolean for the find call
+ bool indexed_in_judy; // true when the request is indexed in judy
+};
+
+// replication sort entry in JudyL array
+// used for sorting all requests, across all nodes
+struct replication_sort_entry {
+ struct replication_request *rq;
+
+ size_t unique_id; // used as a key to identify the sort entry - we never access its contents
+};
+
+#define MAX_REPLICATION_THREADS 20 // + 1 for the main thread
+
+// the global variables for the replication thread
+static struct replication_thread {
+ netdata_mutex_t mutex;
+
+ struct {
+ size_t pending; // number of requests pending in the queue
+ Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
+
+ // statistics
+ size_t added; // number of requests added to the queue
+ size_t removed; // number of requests removed from the queue
+ size_t skipped_not_connected; // number of requests skipped, because the sender is not connected to a parent
+ size_t skipped_no_room; // number of requests skipped, because the sender has no room for responses
+// size_t skipped_no_room_since_last_reset;
+ size_t sender_resets; // number of times a sender reset our last position in the queue
+ time_t first_time_t; // the minimum 'after' we encountered
+
+ struct {
+ Word_t after;
+ Word_t unique_id;
+ Pvoid_t JudyL_array;
+ } queue;
+
+ } unsafe; // protected from replication_recursive_lock()
+
+ struct {
+ size_t executed; // the number of replication requests executed
+ size_t latest_first_time; // the 'after' timestamp of the last request we executed
+ } atomic; // access should be with atomic operations
+
+ struct {
+ size_t waits;
+ size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
+
+ netdata_thread_t **threads_ptrs;
+ size_t threads;
+ } main_thread; // access is allowed only by the main thread
+
+} replication_globals = {
+ .mutex = NETDATA_MUTEX_INITIALIZER,
+ .unsafe = {
+ .pending = 0,
+ .unique_id = 0,
+
+ .added = 0,
+ .removed = 0,
+ .skipped_not_connected = 0,
+ .skipped_no_room = 0,
+// .skipped_no_room_since_last_reset = 0,
+ .sender_resets = 0,
+
+ .first_time_t = 0,
+
+ .queue = {
+ .after = 0,
+ .unique_id = 0,
+ .JudyL_array = NULL,
+ },
+ },
+ .atomic = {
+ .executed = 0,
+ .latest_first_time = 0,
+ },
+ .main_thread = {
+ .waits = 0,
+ .last_executed = 0,
+ .threads = 0,
+ .threads_ptrs = NULL,
+ },
+};
+
+#define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
+#define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
+
+static inline bool replication_recursive_lock_mode(char mode) {
+ static __thread int recursions = 0;
+
+ if(mode == 'L') { // (L)ock
+ if(++recursions == 1)
+ netdata_mutex_lock(&replication_globals.mutex);
+ }
+ else if(mode == 'U') { // (U)nlock
+ if(--recursions == 0)
+ netdata_mutex_unlock(&replication_globals.mutex);
+ }
+ else if(mode == 'C') { // (C)heck
+ if(recursions > 0)
+ return true;
+ else
+ return false;
+ }
+ else
+ fatal("REPLICATION: unknown lock mode '%c'", mode);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(recursions < 0)
+ fatal("REPLICATION: recursions is %d", recursions);
+#endif
+
+ return true;
+}
+
+#define replication_recursive_lock() replication_recursive_lock_mode('L')
+#define replication_recursive_unlock() replication_recursive_lock_mode('U')
+#define fatal_when_replication_is_not_locked_for_me() do { \
+ if(!replication_recursive_lock_mode('C')) \
+ fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
+} while(0)
+
+void replication_set_next_point_in_time(time_t after, size_t unique_id) {
+ replication_recursive_lock();
+ replication_globals.unsafe.queue.after = after;
+ replication_globals.unsafe.queue.unique_id = unique_id;
+ replication_recursive_unlock();
+}
+
+// ----------------------------------------------------------------------------
+// replication sort entry management
+
+static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) {
+ fatal_when_replication_is_not_locked_for_me();
+
+ struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
+
+ rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
+
+ // copy the request
+ rse->rq = rq;
+ rse->unique_id = ++replication_globals.unsafe.unique_id;
+
+ // save the unique id into the request, to be able to delete it later
+ rq->unique_id = rse->unique_id;
+ rq->indexed_in_judy = false;
+ return rse;
+}
+
+static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
+ freez(rse);
+}
+
+static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) {
+ replication_recursive_lock();
+
+ struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
+
+// if(rq->after < (time_t)replication_globals.protected.queue.after &&
+// rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
+// !replication_globals.protected.skipped_no_room_since_last_reset) {
+//
+// // make it find this request first
+// replication_set_next_point_in_time(rq->after, rq->unique_id);
+// }
+
+ replication_globals.unsafe.added++;
+ replication_globals.unsafe.pending++;
+
+ Pvoid_t *inner_judy_ptr;
+
+ // find the outer judy entry, using after as key
+ inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
+ if(!inner_judy_ptr)
+ inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
+
+ // add it to the inner judy, using unique_id as key
+ Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
+ *item = rse;
+ rq->indexed_in_judy = true;
+
+ if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
+ replication_globals.unsafe.first_time_t = rq->after;
+
+ replication_recursive_unlock();
+
+ return rse;
+}
+
+static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
+ fatal_when_replication_is_not_locked_for_me();
+
+ bool inner_judy_deleted = false;
+
+ replication_globals.unsafe.removed++;
+ replication_globals.unsafe.pending--;
+
+ rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
+
+ rse->rq->indexed_in_judy = false;
+
+ // delete it from the inner judy
+ JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
+
+ // if no items left, delete it from the outer judy
+ if(**inner_judy_ppptr == NULL) {
+ JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
+ inner_judy_deleted = true;
+ }
+
+ // free memory
+ replication_sort_entry_destroy(rse);
+
+ return inner_judy_deleted;
+}
+
+static void replication_sort_entry_del(struct replication_request *rq) {
+ Pvoid_t *inner_judy_pptr;
+ struct replication_sort_entry *rse_to_delete = NULL;
+
+ replication_recursive_lock();
+ if(rq->indexed_in_judy) {
+
+ inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0);
+ if (inner_judy_pptr) {
+ Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
+ if (our_item_pptr) {
+ rse_to_delete = *our_item_pptr;
+ replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr);
+ }
+ }
+
+ if (!rse_to_delete)
+ fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
+ rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
+
+ }
+
+ replication_recursive_unlock();
+}
+
+static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) {
+ if(unlikely(first))
+ return JudyLFirst(PArray, PIndex, PJE0);
+
+ return JudyLNext(PArray, PIndex, PJE0);
+}
+
+static struct replication_request replication_request_get_first_available() {
+ Pvoid_t *inner_judy_pptr;
+
+ replication_recursive_lock();
+
+ struct replication_request rq_to_return = (struct replication_request){ .found = false };
+
+ if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
+ replication_globals.unsafe.queue.after = 0;
+ replication_globals.unsafe.queue.unique_id = 0;
+ }
+
+ Word_t started_after = replication_globals.unsafe.queue.after;
+
+ size_t round = 0;
+ while(!rq_to_return.found) {
+ round++;
+
+ if(round > 2)
+ break;
+
+ if(round == 2) {
+ if(started_after == 0)
+ break;
+
+ replication_globals.unsafe.queue.after = 0;
+ replication_globals.unsafe.queue.unique_id = 0;
+ }
+
+ bool find_same_after = true;
+ while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, find_same_after))) {
+ Pvoid_t *our_item_pptr;
+
+ if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
+ break;
+
+ while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
+ struct replication_sort_entry *rse = *our_item_pptr;
+ struct replication_request *rq = rse->rq;
+ struct sender_state *s = rq->sender;
+
+ if (likely(rrdpush_sender_get_buffer_used_percent(s) <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) {
+ // there is room for this request in the sender buffer
+
+ bool sender_is_connected =
+ rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+
+ bool sender_has_been_flushed_since_this_request =
+ rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
+
+ if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
+ // skip this request, the sender is not connected, or it has reconnected
+
+ replication_globals.unsafe.skipped_not_connected++;
+ if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ // we removed the item from the outer JudyL
+ break;
+ }
+ else {
+ // this request is good to execute
+
+ // copy the request to return it
+ rq_to_return = *rq;
+ rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
+
+ // set the return result to found
+ rq_to_return.found = true;
+
+ if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ // we removed the item from the outer JudyL
+ break;
+ }
+ }
+ else {
+ replication_globals.unsafe.skipped_no_room++;
+// replication_globals.protected.skipped_no_room_since_last_reset++;
+ }
+ }
+
+ // call JudyLNext from now on
+ find_same_after = false;
+
+ // prepare for the next iteration on the outer loop
+ replication_globals.unsafe.queue.unique_id = 0;
+ }
+ }
+
+ replication_recursive_unlock();
+ return rq_to_return;
+}
+
+// ----------------------------------------------------------------------------
+// replication request management
+
+static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
+ struct sender_state *s = sender_state; (void)s;
+ struct replication_request *rq = value;
+
+ // IMPORTANT:
+ // We use the react instead of the insert callback
+ // because we want the item to be atomically visible
+ // to our replication thread, immediately after.
+
+ // If we put this at the insert callback, the item is not guaranteed
+ // to be atomically visible to others, so the replication thread
+ // may see the replication sort entry, but fail to find the dictionary item
+ // related to it.
+
+ replication_sort_entry_add(rq);
+
+ // this request is about a unique chart for this sender
+ rrdpush_sender_replicating_charts_plus_one(s);
+}
+
+static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
+ struct sender_state *s = sender_state; (void)s;
+ struct replication_request *rq = old_value; (void)rq;
+ struct replication_request *rq_new = new_value;
+
+ replication_recursive_lock();
+
+ if(!rq->indexed_in_judy) {
+ replication_sort_entry_add(rq);
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+ (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+ }
+ else {
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
+ dictionary_acquired_item_name(item),
+ (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
+ (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
+ }
+
+ replication_recursive_unlock();
+
+ string_freez(rq_new->chart_id);
+ return false;
+}
+
+static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
+ struct replication_request *rq = value;
+
+ // this request is about a unique chart for this sender
+ rrdpush_sender_replicating_charts_minus_one(rq->sender);
+
+ if(rq->indexed_in_judy)
+ replication_sort_entry_del(rq);
+
+ string_freez(rq->chart_id);
+}
+
+static bool replication_execute_request(struct replication_request *rq, bool workers) {
+ bool ret = false;
+
+ if(likely(workers))
+ worker_is_busy(WORKER_JOB_FIND_CHART);
+
+ RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
+ if(!st) {
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
+ rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
+
+ goto cleanup;
+ }
+
+ if(likely(workers))
+ worker_is_busy(WORKER_JOB_QUERYING);
+
+ netdata_thread_disable_cancelability();
+
+ // send the replication data
+ bool start_streaming = replicate_chart_response(
+ st->rrdhost, st, rq->start_streaming, rq->after, rq->before);
+
+ netdata_thread_enable_cancelability();
+
+ if(start_streaming && rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender)) {
+ // enable normal streaming if we have to
+ // but only if the sender buffer has not been flushed since we started
+
+ if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
+ }
+ else
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
+ rrdhost_hostname(st->rrdhost), string2str(rq->chart_id));
+ }
+
+ __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
+
+ ret = true;
+
+cleanup:
+ string_freez(rq->chart_id);
+ return ret;
+}
+
+// ----------------------------------------------------------------------------
+// public API
+
+void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
+ struct replication_request rq = {
+ .sender = sender,
+ .chart_id = string_strdupz(chart_id),
+ .after = after,
+ .before = before,
+ .start_streaming = start_streaming,
+ .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
+ };
+
+ if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
+ replication_execute_request(&rq, false);
+
+ else
+ dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request));
+}
+
+void replication_sender_delete_pending_requests(struct sender_state *sender) {
+ // allow the dictionary destructor to go faster on locks
+ replication_recursive_lock();
+ dictionary_flush(sender->replication.requests);
+ replication_recursive_unlock();
+}
+
+void replication_init_sender(struct sender_state *sender) {
+ sender->replication.requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
+ dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
+ dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
+}
+
+void replication_cleanup_sender(struct sender_state *sender) {
+ // allow the dictionary destructor to go faster on locks
+ replication_recursive_lock();
+ dictionary_destroy(sender->replication.requests);
+ replication_recursive_unlock();
+}
+
+void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
+ size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
+ size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
+
+ if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
+ s->replication.unsafe.reached_max = true;
+
+ if(s->replication.unsafe.reached_max &&
+ percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
+ s->replication.unsafe.reached_max = false;
+ replication_recursive_lock();
+// replication_set_next_point_in_time(0, 0);
+ replication_globals.unsafe.sender_resets++;
+ replication_recursive_unlock();
+ }
+
+ rrdpush_sender_set_buffer_used_percent(s, percentage);
+}
+
+// ----------------------------------------------------------------------------
+// replication thread
+
+static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
+ internal_error(
+ host->sender &&
+ !rrdpush_sender_pending_replication_requests(host->sender) &&
+ dictionary_entries(host->sender->replication.requests) != 0,
+ "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
+ rrdhost_hostname(host),
+ rrdpush_sender_pending_replication_requests(host->sender),
+ dictionary_entries(host->sender->replication.requests)
+ );
+
+ size_t ok = 0;
+ size_t errors = 0;
+
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
+ RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+
+ bool is_error = false;
+
+ if(!flags) {
+ internal_error(
+ true,
+ "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
+ rrdhost_hostname(host), rrdset_id(st)
+ );
+ is_error = true;
+ }
+
+ if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ internal_error(
+ true,
+ "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
+ rrdhost_hostname(host), rrdset_id(st)
+ );
+ is_error = true;
+ }
+
+ if(is_error)
+ errors++;
+ else
+ ok++;
+ }
+ rrdset_foreach_done(st);
+
+ internal_error(errors,
+ "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
+ rrdhost_hostname(host), ok, errors);
+
+ return errors;
+}
+
+static void verify_all_hosts_charts_are_streaming_now(void) {
+ worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
+
+ size_t errors = 0;
+ RRDHOST *host;
+ dfe_start_read(rrdhost_root_index, host)
+ errors += verify_host_charts_are_streaming_now(host);
+ dfe_done(host);
+
+ size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
+ info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
+ executed - replication_globals.main_thread.last_executed, errors);
+ replication_globals.main_thread.last_executed = executed;
+}
+
+static void replication_initialize_workers(bool master) {
+ worker_register("REPLICATION");
+ worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
+ worker_register_job_name(WORKER_JOB_QUERYING, "querying");
+ worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
+ worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
+ worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
+ worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
+ worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
+
+ if(master) {
+ worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ }
+}
+
+#define REQUEST_OK (0)
+#define REQUEST_QUEUE_EMPTY (-1)
+#define REQUEST_CHART_NOT_FOUND (-2)
+
+static int replication_execute_next_pending_request(void) {
+ worker_is_busy(WORKER_JOB_FIND_NEXT);
+ struct replication_request rq = replication_request_get_first_available();
+
+ if(unlikely(!rq.found))
+ return REQUEST_QUEUE_EMPTY;
+
+ // delete the request from the dictionary
+ worker_is_busy(WORKER_JOB_DELETE_ENTRY);
+ if(!dictionary_del(rq.sender->replication.requests, string2str(rq.chart_id)))
+ error("REPLAY ERROR: 'host:%s/chart:%s' failed to be deleted from sender pending charts index",
+ rrdhost_hostname(rq.sender->host), string2str(rq.chart_id));
+
+ replication_set_latest_first_time(rq.after);
+
+ if(unlikely(!replication_execute_request(&rq, true)))
+ return REQUEST_CHART_NOT_FOUND;
+
+ return REQUEST_OK;
+}
+
+static void replication_worker_cleanup(void *ptr __maybe_unused) {
+ worker_unregister();
+}
+
+static void *replication_worker_thread(void *ptr) {
+ replication_initialize_workers(false);
+
+ netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
+
+ while(!netdata_exit) {
+ if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ worker_is_idle();
+ sleep_usec(1 * USEC_PER_SEC);
+ }
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
+
+static void replication_main_cleanup(void *ptr) {
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+ int threads = (int)replication_globals.main_thread.threads;
+ for(int i = 0; i < threads ;i++) {
+ netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
+ freez(replication_globals.main_thread.threads_ptrs[i]);
+ }
+ freez(replication_globals.main_thread.threads_ptrs);
+ replication_globals.main_thread.threads_ptrs = NULL;
+
+ // custom code
+ worker_unregister();
+
+ static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+void *replication_thread_main(void *ptr __maybe_unused) {
+ replication_initialize_workers(true);
+
+ int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1);
+ if(threads < 1 || threads > MAX_REPLICATION_THREADS) {
+ error("replication threads given %d is invalid, resetting to 1", threads);
+ threads = 1;
+ }
+
+ if(--threads) {
+ replication_globals.main_thread.threads = threads;
+ replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
+
+ for(int i = 0; i < threads ;i++) {
+ replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
+ netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], "REPLICATION",
+ NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
+ }
+ }
+
+ netdata_thread_cleanup_push(replication_main_cleanup, ptr);
+
+ // start from 100% completed
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
+
+ long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
+ bool slow = true; // control the time we sleep - it has to start with true!
+ usec_t last_now_mono_ut = now_monotonic_usec();
+ time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds
+
+ size_t last_executed = 0;
+ size_t last_sender_resets = 0;
+
+ while(!netdata_exit) {
+
+ // statistics
+ usec_t now_mono_ut = now_monotonic_usec();
+ if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
+ last_now_mono_ut = now_mono_ut;
+
+ replication_recursive_lock();
+
+ size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
+ if(last_executed != current_executed) {
+ run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
+ last_executed = current_executed;
+ slow = false;
+ }
+
+ if(replication_reset_next_point_in_time_countdown-- == 0) {
+ // once per second, make it scan all the pending requests next time
+ replication_set_next_point_in_time(0, 0);
+// replication_globals.protected.skipped_no_room_since_last_reset = 0;
+ replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
+ }
+
+ if(!replication_globals.unsafe.pending && --run_verification_countdown == 0) {
+ // reset the statistics about completion percentage
+ replication_globals.unsafe.first_time_t = 0;
+ replication_set_latest_first_time(0);
+
+ verify_all_hosts_charts_are_streaming_now();
+
+ run_verification_countdown = LONG_MAX;
+ slow = true;
+ }
+
+ worker_is_busy(WORKER_JOB_STATISTICS);
+
+ time_t latest_first_time_t = replication_get_latest_first_time();
+ if(latest_first_time_t && replication_globals.unsafe.pending) {
+ // completion percentage statistics
+ time_t now = now_realtime_sec();
+ time_t total = now - replication_globals.unsafe.first_time_t;
+ time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t;
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
+ (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
+ }
+ else
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
+
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_not_connected);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.skipped_no_room);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
+ worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)replication_globals.main_thread.waits);
+
+ replication_recursive_unlock();
+ }
+
+ if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ replication_recursive_lock();
+
+ // the timeout also defines now frequently we will traverse all the pending requests
+ // when the outbound buffers of all senders is full
+ usec_t timeout;
+ if(slow)
+ // no work to be done, wait for a request to come in
+ timeout = 1000 * USEC_PER_MS;
+
+ else if(replication_globals.unsafe.pending > 0) {
+ if(replication_globals.unsafe.sender_resets == last_sender_resets) {
+ timeout = 1000 * USEC_PER_MS;
+ }
+ else {
+ // there are pending requests waiting to be executed,
+ // but none could be executed at this time.
+ // try again after this time.
+ timeout = 100 * USEC_PER_MS;
+ }
+
+ last_sender_resets = replication_globals.unsafe.sender_resets;
+ }
+ else {
+ // no requests pending, but there were requests recently (run_verification_countdown)
+ // so, try in a short time.
+ // if this is big, one chart replicating will be slow to finish (ping - pong just one chart)
+ timeout = 10 * USEC_PER_MS;
+ last_sender_resets = replication_globals.unsafe.sender_resets;
+ }
+
+ replication_globals.main_thread.waits++;
+ replication_recursive_unlock();
+
+ worker_is_idle();
+ sleep_usec(timeout);
+
+ // make it scan all the pending requests next time
+ replication_set_next_point_in_time(0, 0);
+ replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
+
+ continue;
+ }
+ }
+
+ netdata_thread_cleanup_pop(1);
+ return NULL;
+}
diff --git a/streaming/replication.h b/streaming/replication.h
new file mode 100644
index 000000000..00462cc3a
--- /dev/null
+++ b/streaming/replication.h
@@ -0,0 +1,33 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#include "daemon/common.h"
+
+struct replication_query_statistics {
+ SPINLOCK spinlock;
+ size_t queries_started;
+ size_t queries_finished;
+ size_t points_read;
+ size_t points_generated;
+};
+
+struct replication_query_statistics replication_get_query_statistics(void);
+
+bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before);
+
+typedef int (*send_command)(const char *txt, void *data);
+
+bool replicate_chart_request(send_command callback, void *callback_data,
+ RRDHOST *rh, RRDSET *rs,
+ time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
+ time_t response_first_start_time, time_t response_last_end_time);
+
+void replication_init_sender(struct sender_state *sender);
+void replication_cleanup_sender(struct sender_state *sender);
+void replication_sender_delete_pending_requests(struct sender_state *sender);
+void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming);
+void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s);
+
+#endif /* REPLICATION_H */
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index b73f24633..a57f1b080 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -11,8 +11,8 @@
* 1. a random data collection thread, calling rrdset_done_push()
* this is called for each chart.
*
- * the output of this work is kept in a BUFFER in RRDHOST
- * the sender thread is signalled via a pipe (also in RRDHOST)
+ * the output of this work is kept in a thread BUFFER
+ * the sender thread is signalled via a pipe (in RRDHOST)
*
* 2. a sender thread running at the sending netdata
* this is spawned automatically on the first chart to be pushed
@@ -46,6 +46,9 @@ unsigned int default_compression_enabled = 1;
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
char *default_rrdpush_send_charts_matching = NULL;
+bool default_rrdpush_enable_replication = true;
+time_t default_rrdpush_seconds_to_replicate = 86400;
+time_t default_rrdpush_replication_step = 600;
#ifdef ENABLE_HTTPS
int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL;
char *netdata_ssl_ca_path = NULL;
@@ -66,6 +69,31 @@ static void load_stream_conf() {
freez(filename);
}
+bool rrdpush_receiver_needs_dbengine() {
+ struct section *co;
+
+ for(co = stream_config.first_section; co; co = co->next) {
+ if(strcmp(co->name, "stream") == 0)
+ continue; // the first section is not relevant
+
+ char *s;
+
+ s = appconfig_get_by_section(co, "enabled", NULL);
+ if(!s || !appconfig_test_boolean_value(s))
+ continue;
+
+ s = appconfig_get_by_section(co, "default memory mode", NULL);
+ if(s && strcmp(s, "dbengine") == 0)
+ return true;
+
+ s = appconfig_get_by_section(co, "memory mode", NULL);
+ if(s && strcmp(s, "dbengine") == 0)
+ return true;
+ }
+
+ return false;
+}
+
int rrdpush_init() {
// --------------------------------------------------------------------
// load stream.conf
@@ -75,6 +103,11 @@ int rrdpush_init() {
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
+
+ default_rrdpush_enable_replication = config_get_boolean(CONFIG_SECTION_DB, "enable replication", default_rrdpush_enable_replication);
+ default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate);
+ default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step);
+
rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time);
#ifdef ENABLE_COMPRESSION
@@ -101,14 +134,14 @@ int rrdpush_init() {
bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO);
if(invalid_certificate == CONFIG_BOOLEAN_YES){
- if(netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
+ if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
info("Netdata is configured to accept invalid SSL certificate.");
- netdata_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
+ netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
}
}
- netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", "/etc/ssl/certs/");
- netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", "/etc/ssl/certs/certs.pem");
+ netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
+ netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
#endif
return default_rrdpush_enabled;
@@ -128,30 +161,31 @@ int rrdpush_init() {
// this is for the first iterations of each chart
unsigned int remote_clock_resync_iterations = 60;
-
-static inline int should_send_chart_matching(RRDSET *st) {
- // Do not stream anomaly rates charts.
- if (unlikely(st->state->is_ar_chart))
+static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) {
+ if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
return false;
- if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION))
- return ml_streaming_enabled();
-
- if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
+ if(unlikely(!(flags & (RRDSET_FLAG_UPSTREAM_SEND | RRDSET_FLAG_UPSTREAM_IGNORE)))) {
RRDHOST *host = st->rrdhost;
- if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) ||
- simple_pattern_matches(host->rrdpush_send_charts_matching, st->name)) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
- rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ if (flags & RRDSET_FLAG_ANOMALY_DETECTION) {
+ if(ml_streaming_enabled())
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ else
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
}
- else {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
+ simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st)))
+
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ else
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
- }
+
+ // get the flags again, to know how to respond
+ flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE);
}
- return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND));
+ return flags & RRDSET_FLAG_UPSTREAM_SEND;
}
int configured_as_parent() {
@@ -173,42 +207,25 @@ int configured_as_parent() {
return is_parent;
}
-// checks if the current chart definition has been sent
-static inline int need_to_send_chart_definition(RRDSET *st) {
- rrdset_check_rdlock(st);
-
- if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))))
- return 1;
-
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if(unlikely(!rd->exposed)) {
- #ifdef NETDATA_INTERNAL_CHECKS
- info("host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", st->rrdhost->hostname, st->id, rd->id);
- #endif
- return 1;
- }
- }
-
- return 0;
-}
-
// chart labels
static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
BUFFER *wb = (BUFFER *)data;
buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls);
return 1;
}
-void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) {
- if (st->state && st->state->chart_labels) {
- if(rrdlabels_walkthrough_read(st->state->chart_labels, send_clabels_callback, host->sender->build) > 0)
- buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n");
+
+static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) {
+ if (st->rrdlabels) {
+ if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0)
+ buffer_sprintf(wb, "CLABEL_COMMIT\n");
}
}
// Send the current chart definition.
// Assumes that collector thread has already called sender_start for mutex / buffer state.
-static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
+static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
+ bool replication_progress = false;
+
RRDHOST *host = st->rrdhost;
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
@@ -216,9 +233,9 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
// properly set the name for the remote end to parse it
char *name = "";
if(likely(st->name)) {
- if(unlikely(strcmp(st->id, st->name))) {
+ if(unlikely(st->id != st->name)) {
// they differ
- name = strchr(st->name, '.');
+ name = strchr(rrdset_name(st), '.');
if(name)
name++;
else
@@ -228,14 +245,14 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
// send the chart
buffer_sprintf(
- host->sender->build
+ wb
, "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
- , st->id
+ , rrdset_id(st)
, name
- , st->title
- , st->units
- , st->family
- , st->context
+ , rrdset_title(st)
+ , rrdset_units(st)
+ , rrdset_family(st)
+ , rrdset_context(st)
, rrdset_type_name(st->chart_type)
, st->priority
, st->update_every
@@ -243,120 +260,190 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
, rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
, rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
, rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
- , (st->plugin_name)?st->plugin_name:""
- , (st->module_name)?st->module_name:""
+ , rrdset_plugin_name(st)
+ , rrdset_module_name(st)
);
// send the chart labels
- if (host->sender->version >= STREAM_VERSION_CLABELS)
- rrdpush_send_clabels(host, st);
+ if (stream_has_capability(host->sender, STREAM_CAP_CLABELS))
+ rrdpush_send_clabels(wb, st);
// send the dimensions
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
buffer_sprintf(
- host->sender->build
+ wb
, "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
- , rd->id
- , rd->name
+ , rrddim_id(rd)
+ , rrddim_name(rd)
, rrd_algorithm_name(rd->algorithm)
, rd->multiplier
, rd->divisor
, rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
- , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
- , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
+ , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
+ , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
rd->exposed = 1;
}
+ rrddim_foreach_done(rd);
+
+ // send the chart functions
+ if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
+ rrd_functions_expose_rrdpush(st, wb);
// send the chart local custom variables
- RRDSETVAR *rs;
- for(rs = st->variables; rs ;rs = rs->next) {
- if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) {
- NETDATA_DOUBLE *value = (NETDATA_DOUBLE *) rs->value;
-
- buffer_sprintf(
- host->sender->build
- , "VARIABLE CHART %s = " NETDATA_DOUBLE_FORMAT "\n"
- , rs->variable
- , *value
- );
+ rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
+
+ if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
+ time_t first_entry_local = rrdset_first_entry_t_of_tier(st, 0);
+ time_t last_entry_local = st->last_updated.tv_sec;
+
+ if(unlikely(!last_entry_local))
+ last_entry_local = rrdset_last_entry_t(st);
+
+ time_t now = now_realtime_sec();
+ if(unlikely(last_entry_local > now)) {
+ internal_error(true,
+ "RRDSET REPLAY ERROR: 'host:%s/chart:%s' last updated time %ld is in the future, adjusting it to now %ld",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ last_entry_local, now);
+ last_entry_local = now;
+ }
+
+ if(unlikely(first_entry_local && last_entry_local && first_entry_local >= last_entry_local)) {
+ internal_error(true,
+ "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first updated time %ld is equal or bigger than last updated time %ld, adjusting it last updated time - update every",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ first_entry_local, last_entry_local);
+ first_entry_local = last_entry_local - st->update_every;
+ }
+
+ if(unlikely(!first_entry_local && last_entry_local)) {
+ internal_error(true,
+ "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first time %ld, last time %ld, setting both to last time",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ first_entry_local, last_entry_local);
+ first_entry_local = last_entry_local;
}
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n",
+ (unsigned long long)first_entry_local,
+ (unsigned long long)last_entry_local,
+ (unsigned long long)now);
+
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+ replication_progress = true;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
}
st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
+ return replication_progress;
}
// sends the current chart dimensions
-static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) {
- RRDHOST *host = st->rrdhost;
- buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
- if (s->version >= VERSION_GAP_FILLING)
- buffer_sprintf(host->sender->build, " %"PRId64"\n", (int64_t)st->last_collected_time.tv_sec);
+static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) {
+ buffer_fast_strcat(wb, "BEGIN \"", 7);
+ buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
+ buffer_fast_strcat(wb, "\" ", 2);
+
+ if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time)
+ buffer_print_llu(wb, st->usec_since_last_update);
else
- buffer_strcat(host->sender->build, "\n");
+ buffer_fast_strcat(wb, "0", 1);
+
+ buffer_fast_strcat(wb, "\n", 1);
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(rd->updated && rd->exposed)
- buffer_sprintf(host->sender->build
- , "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n"
- , rd->id
- , rd->collected_value
- );
+ if(unlikely(!rd->updated))
+ continue;
+
+ if(likely(rd->exposed)) {
+ buffer_fast_strcat(wb, "SET \"", 5);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "\" = ", 4);
+ buffer_print_ll(wb, rd->collected_value);
+ buffer_fast_strcat(wb, "\n", 1);
+ }
+ else {
+ internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
+ // we will include it in the next iteration
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ }
}
- buffer_strcat(host->sender->build, "END\n");
+ rrddim_foreach_done(rd);
+
+ if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
+ rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
+
+ buffer_fast_strcat(wb, "END\n", 4);
}
static void rrdpush_sender_thread_spawn(RRDHOST *host);
// Called from the internal collectors to mark a chart obsolete.
-void rrdset_push_chart_definition_now(RRDSET *st) {
+bool rrdset_push_chart_definition_now(RRDSET *st) {
RRDHOST *host = st->rrdhost;
- if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st)))
- return;
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
+ || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST))))
+ return false;
+
+ BUFFER *wb = sender_start(host->sender);
+ rrdpush_send_chart_definition(wb, st);
+ sender_commit(host->sender, wb);
- rrdset_rdlock(st);
- sender_start(host->sender);
- rrdpush_send_chart_definition_nolock(st);
- sender_commit(host->sender);
- rrdset_unlock(st);
+ return true;
}
void rrdset_done_push(RRDSET *st) {
- if(unlikely(!should_send_chart_matching(st)))
- return;
-
RRDHOST *host = st->rrdhost;
- if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
- rrdpush_sender_thread_spawn(host);
+ // fetch the flags we need to check with one atomic operation
+ RRDHOST_FLAGS host_flags = __atomic_load_n(&host->flags, __ATOMIC_SEQ_CST);
+
+ // check if we are not connected
+ if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
+
+ if(unlikely(!(host_flags & (RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED))))
+ rrdpush_sender_thread_spawn(host);
+
+ if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
+ error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+ }
- // Handle non-connected case
- if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) {
- if(unlikely(!host->rrdpush_sender_error_shown))
- error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname);
- host->rrdpush_sender_error_shown = 1;
return;
}
- else if(unlikely(host->rrdpush_sender_error_shown)) {
- info("STREAM %s [send]: sending metrics...", host->hostname);
- host->rrdpush_sender_error_shown = 0;
+ else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
+ info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
- sender_start(host->sender);
+ RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
+ bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
+ bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- if(need_to_send_chart_definition(st))
- rrdpush_send_chart_definition_nolock(st);
+ if(unlikely((exposed_upstream && replication_in_progress) ||
+ !should_send_chart_matching(st, rrdset_flags)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
- rrdpush_send_chart_metrics_nolock(st, host->sender);
+ if(unlikely(!exposed_upstream))
+ replication_in_progress = rrdpush_send_chart_definition(wb, st);
- // signal the sender there are more data
- if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
- error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+ if (likely(!replication_in_progress))
+ rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags);
- sender_commit(host->sender);
+ sender_commit(host->sender, wb);
}
// labels
@@ -365,45 +452,38 @@ static int send_labels_callback(const char *name, const char *value, RRDLABEL_SR
buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
return 1;
}
-void rrdpush_send_labels(RRDHOST *host) {
- if (!host->host_labels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP)))
+void rrdpush_send_host_labels(RRDHOST *host) {
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
+ || !stream_has_capability(host->sender, STREAM_CAP_HLABELS)))
return;
- sender_start(host->sender);
-
- rrdlabels_walkthrough_read(host->host_labels, send_labels_callback, host->sender->build);
- buffer_sprintf(host->sender->build, "OVERWRITE %s\n", "labels");
- sender_commit(host->sender);
+ BUFFER *wb = sender_start(host->sender);
- if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
- error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+ rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb);
+ buffer_sprintf(wb, "OVERWRITE %s\n", "labels");
- rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
+ sender_commit(host->sender, wb);
}
void rrdpush_claimed_id(RRDHOST *host)
{
- if(unlikely(!host->rrdpush_send_enabled || !__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)))
- return;
-
- if(host->sender->version < STREAM_VERSION_CLAIM)
+ if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
return;
- sender_start(host->sender);
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
rrdhost_aclk_state_lock(host);
- buffer_sprintf(host->sender->build, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
+ buffer_sprintf(wb, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
rrdhost_aclk_state_unlock(host);
- sender_commit(host->sender);
-
- // signal the sender there are more data
- if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
- error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+ sender_commit(host->sender, wb);
}
int connect_to_one_of_destinations(
- struct rrdpush_destinations *destinations,
+ RRDHOST *host,
int default_port,
struct timeval *timeout,
size_t *reconnects_counter,
@@ -413,28 +493,44 @@ int connect_to_one_of_destinations(
{
int sock = -1;
- for (struct rrdpush_destinations *d = destinations; d; d = d->next) {
- if (d->disabled_no_proper_reply) {
- d->disabled_no_proper_reply = 0;
- continue;
- } else if (d->disabled_because_of_localhost) {
- continue;
- } else if (d->disabled_already_streaming && (d->disabled_already_streaming + 30 > now_realtime_sec())) {
- continue;
- } else if (d->disabled_because_of_denied_access) {
- d->disabled_because_of_denied_access = 0;
+ for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) {
+ time_t now = now_realtime_sec();
+
+ if(d->postpone_reconnection_until > now) {
+ info(
+ "STREAM %s: skipping destination '%s' (default port: %d) due to last error (code: %d, %s), will retry it in %d seconds",
+ rrdhost_hostname(host),
+ string2str(d->destination),
+ default_port,
+ d->last_handshake, d->last_error?d->last_error:"unset reason description",
+ (int)(d->postpone_reconnection_until - now));
+
continue;
}
+ info(
+ "STREAM %s: attempting to connect to '%s' (default port: %d)...",
+ rrdhost_hostname(host),
+ string2str(d->destination),
+ default_port);
+
if (reconnects_counter)
*reconnects_counter += 1;
- sock = connect_to_this(d->destination, default_port, timeout);
+
+ sock = connect_to_this(string2str(d->destination), default_port, timeout);
+
if (sock != -1) {
- if (connected_to && connected_to_size) {
- strncpy(connected_to, d->destination, connected_to_size);
- connected_to[connected_to_size - 1] = '\0';
- }
+ if (connected_to && connected_to_size)
+ strncpyz(connected_to, string2str(d->destination), connected_to_size);
+
*destination = d;
+
+ // move the current item to the end of the list
+ // without this, this destination will break the loop again and again
+ // not advancing the destinations to find one that may work
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, d, prev, next);
+ DOUBLE_LINKED_LIST_APPEND_UNSAFE(host->destinations, d, prev, next);
+
break;
}
}
@@ -442,44 +538,51 @@ int connect_to_one_of_destinations(
return sock;
}
-struct rrdpush_destinations *destinations_init(const char *dests) {
- const char *s = dests;
- struct rrdpush_destinations *destinations = NULL, *prev = NULL;
- while(*s) {
- const char *e = s;
-
- // skip path, moving both s(tart) and e(nd)
- if(*e == '/')
- while(!isspace(*e) && *e != ',') s = ++e;
-
- // skip separators, moving both s(tart) and e(nd)
- while(isspace(*e) || *e == ',') s = ++e;
-
- // move e(nd) to the first separator
- while(*e && !isspace(*e) && *e != ',' && *e != '/') e++;
-
- // is there anything?
- if(!*s || s == e) break;
-
- char buf[e - s + 1];
- strncpyz(buf, s, e - s);
- struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
- strncpyz(d->destination, buf, sizeof(d->destination)-1);
- d->disabled_no_proper_reply = 0;
- d->disabled_because_of_localhost = 0;
- d->disabled_already_streaming = 0;
- d->disabled_because_of_denied_access = 0;
- d->next = NULL;
- if (!destinations) {
- destinations = d;
- } else {
- prev->next = d;
- }
- prev = d;
+struct destinations_init_tmp {
+ RRDHOST *host;
+ struct rrdpush_destinations *list;
+ int count;
+};
+
+bool destinations_init_add_one(char *entry, void *data) {
+ struct destinations_init_tmp *t = data;
+
+ struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
+ d->destination = string_strdupz(entry);
+
+ DOUBLE_LINKED_LIST_APPEND_UNSAFE(t->list, d, prev, next);
+
+ t->count++;
+ info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
+
+ return false; // we return false, so that we will get all defined destinations
+}
+
+void rrdpush_destinations_init(RRDHOST *host) {
+ if(!host->rrdpush_send_destination) return;
+
+ rrdpush_destinations_free(host);
+
+ struct destinations_init_tmp t = {
+ .host = host,
+ .list = NULL,
+ .count = 0,
+ };
+
+ foreach_entry_in_connection_string(host->rrdpush_send_destination, destinations_init_add_one, &t);
- s = e;
+ host->destinations = t.list;
+}
+
+void rrdpush_destinations_free(RRDHOST *host) {
+ while (host->destinations) {
+ struct rrdpush_destinations *tmp = host->destinations;
+ DOUBLE_LINKED_LIST_REMOVE_UNSAFE(host->destinations, tmp, prev, next);
+ string_freez(tmp->destination);
+ freez(tmp);
}
- return destinations;
+
+ host->destinations = NULL;
}
// ----------------------------------------------------------------------------
@@ -495,11 +598,13 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
netdata_mutex_lock(&host->sender->mutex);
netdata_thread_t thr = 0;
- if(host->rrdpush_sender_spawn) {
- info("STREAM %s [send]: signaling sending thread to stop...", host->hostname);
+ if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
+
+ info("STREAM %s [send]: signaling sending thread to stop...", rrdhost_hostname(host));
// signal the thread that we want to join it
- host->rrdpush_sender_join = 1;
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN);
// copy the thread id, so that we will be waiting for the right one
// even if a new one has been spawn
@@ -512,10 +617,10 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
netdata_mutex_unlock(&host->sender->mutex);
if(thr != 0) {
- info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname);
+ info("STREAM %s [send]: waiting for the sending thread to stop...", rrdhost_hostname(host));
void *result;
netdata_thread_join(thr, &result);
- info("STREAM %s [send]: sending thread has exited.", host->hostname);
+ info("STREAM %s [send]: sending thread has exited.", rrdhost_hostname(host));
}
}
@@ -531,15 +636,16 @@ void log_stream_connection(const char *client_ip, const char *client_port, const
static void rrdpush_sender_thread_spawn(RRDHOST *host) {
netdata_mutex_lock(&host->sender->mutex);
- if(!host->rrdpush_sender_spawn) {
+ if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", host->hostname);
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", rrdhost_hostname(host));
if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender))
- error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
+ error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
else
- host->rrdpush_sender_spawn = 1;
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
}
+
netdata_mutex_unlock(&host->sender->mutex);
}
@@ -608,7 +714,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
else if(!strcmp(name, "tags"))
tags = value;
else if(!strcmp(name, "ver"))
- stream_version = MIN((uint32_t) strtoul(value, NULL, 0), STREAMING_PROTOCOL_CURRENT_VERSION);
+ stream_version = convert_stream_version_to_capabilities(strtoul(value, NULL, 0));
else {
// An old Netdata child does not have a compatible streaming protocol, map to something sane.
if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
@@ -624,7 +730,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
name = "NETDATA_HOST_OS_DETECTION";
else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) {
- stream_version = 1;
+ stream_version = convert_stream_version_to_capabilities(1);
}
if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
@@ -635,7 +741,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
}
if (stream_version == UINT_MAX)
- stream_version = 0;
+ stream_version = convert_stream_version_to_capabilities(0);
if(!key || !*key) {
rrdhost_system_info_free(system_info);
@@ -660,21 +766,30 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
if(regenerate_guid(key, buf) == -1) {
rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - INVALID KEY");
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID KEY");
error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key);
return rrdpush_receiver_permission_denied(w);
}
if(regenerate_guid(machine_guid, buf) == -1) {
rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - INVALID MACHINE GUID");
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID MACHINE GUID");
error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid);
return rrdpush_receiver_permission_denied(w);
}
+ const char *api_key_type = appconfig_get(&stream_config, key, "type", "api");
+ if(!api_key_type || !*api_key_type) api_key_type = "unknown";
+ if(strcmp(api_key_type, "api") != 0) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - API KEY GIVEN IS NOT API KEY");
+ error("STREAM [receive from [%s]:%s]: API key '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, key, api_key_type);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) {
rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - KEY NOT ENABLED");
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ENABLED");
error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
return rrdpush_receiver_permission_denied(w);
}
@@ -685,7 +800,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
simple_pattern_free(key_allow_from);
rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname) ? hostname : "-", "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP");
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP");
error("STREAM [receive from [%s]:%s]: API key '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, key);
return rrdpush_receiver_permission_denied(w);
}
@@ -693,9 +808,18 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
}
}
+ const char *machine_guid_type = appconfig_get(&stream_config, machine_guid, "type", "machine");
+ if(!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown";
+ if(strcmp(machine_guid_type, "machine") != 0) {
+ rrdhost_system_info_free(system_info);
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID GIVEN IS NOT A MACHINE GUID");
+ error("STREAM [receive from [%s]:%s]: machine GUID '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid, machine_guid_type);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) {
rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - MACHINE GUID NOT ENABLED");
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ENABLED");
error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
return rrdpush_receiver_permission_denied(w);
}
@@ -706,7 +830,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
simple_pattern_free(machine_allow_from);
rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname) ? hostname : "-", "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP");
+ log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP");
error("STREAM [receive from [%s]:%s]: Machine GUID '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, machine_guid);
return rrdpush_receiver_permission_denied(w);
}
@@ -746,7 +870,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
struct receiver_state *rpt = callocz(1, sizeof(*rpt));
rrd_rdlock();
- RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
+ RRDHOST *host = rrdhost_find_by_guid(machine_guid);
if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
host = NULL;
if (host) {
@@ -763,7 +887,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
info(
"STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - "
"existing connection is dead (%"PRId64" sec), accepting new connection.",
- host->hostname,
+ rrdhost_hostname(host),
w->client_ip,
w->client_port,
(int64_t)age);
@@ -772,12 +896,12 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
netdata_mutex_unlock(&host->receiver_lock);
rrdhost_unlock(host);
rrd_unlock();
- log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, host->hostname,
+ log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, rrdhost_hostname(host),
"REJECTED - ALREADY CONNECTED");
info(
"STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - "
"existing connection is active (within last %"PRId64" sec), rejecting new connection.",
- host->hostname,
+ rrdhost_hostname(host),
w->client_ip,
w->client_port,
(int64_t)age);
@@ -811,7 +935,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
rpt->client_port = strdupz(w->client_port);
rpt->update_every = update_every;
rpt->system_info = system_info;
- rpt->stream_version = stream_version;
+ rpt->capabilities = stream_version;
#ifdef ENABLE_HTTPS
rpt->ssl.conn = w->ssl.conn;
rpt->ssl.flags = w->ssl.flags;
@@ -855,3 +979,66 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
buffer_flush(w->response.data);
return 200;
}
+
+static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
+ if(caps & STREAM_CAP_V1) buffer_strcat(wb, "V1 ");
+ if(caps & STREAM_CAP_V2) buffer_strcat(wb, "V2 ");
+ if(caps & STREAM_CAP_VN) buffer_strcat(wb, "VN ");
+ if(caps & STREAM_CAP_VCAPS) buffer_strcat(wb, "VCAPS ");
+ if(caps & STREAM_CAP_HLABELS) buffer_strcat(wb, "HLABELS ");
+ if(caps & STREAM_CAP_CLAIM) buffer_strcat(wb, "CLAIM ");
+ if(caps & STREAM_CAP_CLABELS) buffer_strcat(wb, "CLABELS ");
+ if(caps & STREAM_CAP_COMPRESSION) buffer_strcat(wb, "COMPRESSION ");
+ if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS ");
+ if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION ");
+ if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY ");
+}
+
+void log_receiver_capabilities(struct receiver_state *rpt) {
+ BUFFER *wb = buffer_create(100);
+ stream_capabilities_to_string(wb, rpt->capabilities);
+
+ info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
+
+ buffer_free(wb);
+}
+
+void log_sender_capabilities(struct sender_state *s) {
+ BUFFER *wb = buffer_create(100);
+ stream_capabilities_to_string(wb, s->capabilities);
+
+ info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
+
+ buffer_free(wb);
+}
+
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
+ STREAM_CAPABILITIES caps = 0;
+
+ if(version <= 1) caps = STREAM_CAP_V1;
+ else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS;
+ else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM;
+ else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS;
+ else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION;
+ else caps = version;
+
+ if(caps & STREAM_CAP_VCAPS)
+ caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2|STREAM_CAP_VN);
+
+ if(caps & STREAM_CAP_VN)
+ caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2);
+
+ if(caps & STREAM_CAP_V2)
+ caps &= ~(STREAM_CAP_V1);
+
+ return caps & STREAM_OUR_CAPABILITIES;
+}
+
+int32_t stream_capabilities_to_vn(uint32_t caps) {
+ if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION;
+ if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS;
+ return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM)
+}
+
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 1eb39cc6c..c5f7618c1 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -10,32 +10,83 @@
#define CONNECTED_TO_SIZE 100
-#define STREAM_VERSION_CLAIM 3
-#define STREAM_VERSION_CLABELS 4
-#define STREAM_VERSION_COMPRESSION 5
-#define VERSION_GAP_FILLING 6
+// ----------------------------------------------------------------------------
+// obsolete versions - do not use anymore
+
+#define STREAM_OLD_VERSION_CLAIM 3
+#define STREAM_OLD_VERSION_CLABELS 4
+#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
+
+// ----------------------------------------------------------------------------
+// capabilities negotiation
+
+typedef enum {
+ // do not use the first 3 bits
+ STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol
+ STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels)
+ STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
+ // v3 = claiming supported
+ // v4 = chart labels supported
+ // v5 = lz4 compression supported
+ STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported
+ STREAM_CAP_HLABELS = (1 << 7), // host labels supported
+ STREAM_CAP_CLAIM = (1 << 8), // claiming supported
+ STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
+ STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
+ STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
+ STREAM_CAP_REPLICATION = (1 << 12), // replication supported
+ STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
+
+ // this must be signed int, so don't use the last bit
+ // needed for negotiating errors between parent and child
+} STREAM_CAPABILITIES;
#ifdef ENABLE_COMPRESSION
-#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_COMPRESSION)
+#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
#else
-#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_CLABELS)
-#endif //ENABLE_COMPRESSION
+#define STREAM_HAS_COMPRESSION 0
+#endif // ENABLE_COMPRESSION
+
+#define STREAM_OUR_CAPABILITIES ( \
+ STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \
+ STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \
+ STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY )
+
+#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)))
+
+// ----------------------------------------------------------------------------
+// stream handshake
+
+#define HTTP_HEADER_SIZE 8192
#define STREAMING_PROTOCOL_VERSION "1.1"
-#define START_STREAMING_PROMPT "Hit me baby, push them over..."
-#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
+#define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..."
+#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
#define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
#define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
#define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
-#define HTTP_HEADER_SIZE 8192
-
typedef enum {
- RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
- RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
-} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
+ STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
+ STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS
+ STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM
+ STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS
+ STREAM_HANDSHAKE_OK_V1 = 1,
+ STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
+ STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
+ STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
+ STREAM_HANDSHAKE_ERROR_DENIED = -4,
+ STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5,
+ STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
+ STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
+ STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
+ STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9
+} STREAM_HANDSHAKE;
+
+
+// ----------------------------------------------------------------------------
typedef struct {
char *os_name;
@@ -47,8 +98,8 @@ typedef struct {
#ifdef ENABLE_COMPRESSION
struct compressor_state {
- char *buffer;
- size_t buffer_size;
+ char *compression_result_buffer;
+ size_t compression_result_buffer_size;
struct compressor_data *data; // Compression API specific data
void (*reset)(struct compressor_state *state);
size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
@@ -56,21 +107,14 @@ struct compressor_state {
};
struct decompressor_state {
- char *buffer;
- size_t buffer_size;
- size_t buffer_len;
- size_t buffer_pos;
- char *out_buffer;
- size_t out_buffer_len;
- size_t out_buffer_pos;
+ size_t signature_size;
size_t total_compressed;
size_t total_uncompressed;
size_t packet_count;
- struct decompressor_data *data; // Decompression API specific data
+ struct decompressor_stream *stream; // Decompression API specific data
void (*reset)(struct decompressor_state *state);
size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
- size_t (*put)(struct decompressor_state *state, const char *data, size_t size);
- size_t (*decompress)(struct decompressor_state *state);
+ size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
size_t (*get)(struct decompressor_state *state, char *data, size_t size);
void (*destroy)(struct decompressor_state **state);
@@ -80,11 +124,17 @@ struct decompressor_state {
// Thread-local storage
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
+typedef enum {
+ SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
+ SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
+} SENDER_FLAGS;
+
struct sender_state {
RRDHOST *host;
- pid_t task_id;
- unsigned int overflow:1;
- int timeout, default_port;
+ pid_t tid; // the thread id of the sender, from gettid()
+ SENDER_FLAGS flags;
+ int timeout;
+ int default_port;
usec_t reconnect_delay;
char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
size_t begin;
@@ -92,22 +142,62 @@ struct sender_state {
size_t sent_bytes;
size_t sent_bytes_on_this_connection;
size_t send_attempts;
- time_t last_sent_t;
+ time_t last_traffic_seen_t;
size_t not_connected_loops;
// Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
// the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
netdata_mutex_t mutex;
struct circular_buffer *buffer;
- BUFFER *build;
- char read_buffer[512];
+ char read_buffer[PLUGINSD_LINE_MAX + 1];
int read_len;
- int32_t version;
+ STREAM_CAPABILITIES capabilities;
+
+ int rrdpush_sender_pipe[2]; // collector to sender thread signaling
+ int rrdpush_sender_socket;
+
#ifdef ENABLE_COMPRESSION
- unsigned int rrdpush_compression;
struct compressor_state *compressor;
#endif
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl ssl; // structure used to encrypt the connection
+#endif
+
+ struct {
+ DICTIONARY *requests; // de-duplication of replication requests, per chart
+
+ struct {
+ size_t pending_requests; // the currently outstanding replication requests
+ size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
+ } atomic;
+
+ struct {
+ bool reached_max; // used to avoid resetting the replication thread too frequently
+ } unsafe; // protected by sender mutex
+
+ } replication;
+
+ struct {
+ size_t buffer_used_percentage; // the current utilization of the sending buffer
+ usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
+ } atomic;
};
+#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED);
+#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
+
+#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED);
+#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
+
+#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)
+
+#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
+
struct receiver_state {
RRDHOST *host;
netdata_thread_t thread;
@@ -127,9 +217,9 @@ struct receiver_state {
char *program_version;
struct rrdhost_system_info *system_info;
int update_every;
- uint32_t stream_version;
+ STREAM_CAPABILITIES capabilities;
time_t last_msg_t;
- char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields
+ char read_buffer[PLUGINSD_LINE_MAX + 1];
int read_len;
unsigned int shutdown:1; // Tell the thread to exit
unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
@@ -140,14 +230,18 @@ struct receiver_state {
unsigned int rrdpush_compression;
struct decompressor_state *decompressor;
#endif
+
+ time_t replication_first_time_t;
};
struct rrdpush_destinations {
- char destination[CONNECTED_TO_SIZE + 1];
- int disabled_no_proper_reply;
- int disabled_because_of_localhost;
- time_t disabled_already_streaming;
- int disabled_because_of_denied_access;
+ STRING *destination;
+
+ const char *last_error;
+ time_t postpone_reconnection_until;
+ STREAM_HANDSHAKE last_handshake;
+
+ struct rrdpush_destinations *prev;
struct rrdpush_destinations *next;
};
@@ -158,27 +252,35 @@ extern unsigned int default_compression_enabled;
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
+extern bool default_rrdpush_enable_replication;
+extern time_t default_rrdpush_seconds_to_replicate;
+extern time_t default_rrdpush_replication_step;
extern unsigned int remote_clock_resync_iterations;
-extern void sender_init(RRDHOST *parent);
-extern struct rrdpush_destinations *destinations_init(const char *destinations);
-void sender_start(struct sender_state *s);
-void sender_commit(struct sender_state *s);
-extern int rrdpush_init();
-extern int configured_as_parent();
-extern void rrdset_done_push(RRDSET *st);
-extern void rrdset_push_chart_definition_now(RRDSET *st);
-extern void *rrdpush_sender_thread(void *ptr);
-extern void rrdpush_send_labels(RRDHOST *host);
-extern void rrdpush_claimed_id(RRDHOST *host);
-
-extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
-extern void rrdpush_sender_thread_stop(RRDHOST *host);
-
-extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv);
-extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
-extern int connect_to_one_of_destinations(
- struct rrdpush_destinations *destinations,
+void rrdpush_destinations_init(RRDHOST *host);
+void rrdpush_destinations_free(RRDHOST *host);
+
+void sender_init(RRDHOST *host);
+
+BUFFER *sender_start(struct sender_state *s);
+void sender_commit(struct sender_state *s, BUFFER *wb);
+void sender_cancel(struct sender_state *s);
+int rrdpush_init();
+bool rrdpush_receiver_needs_dbengine();
+int configured_as_parent();
+void rrdset_done_push(RRDSET *st);
+bool rrdset_push_chart_definition_now(RRDSET *st);
+void *rrdpush_sender_thread(void *ptr);
+void rrdpush_send_host_labels(RRDHOST *host);
+void rrdpush_claimed_id(RRDHOST *host);
+
+int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
+void rrdpush_sender_thread_stop(RRDHOST *host);
+
+void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
+void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
+int connect_to_one_of_destinations(
+ RRDHOST *host,
int default_port,
struct timeval *timeout,
size_t *reconnects_counter,
@@ -186,10 +288,18 @@ extern int connect_to_one_of_destinations(
size_t connected_to_size,
struct rrdpush_destinations **destination);
+void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
+
#ifdef ENABLE_COMPRESSION
struct compressor_state *create_compressor();
struct decompressor_state *create_decompressor();
-size_t is_compressed_data(const char *data, size_t data_size);
#endif
+void log_receiver_capabilities(struct receiver_state *rpt);
+void log_sender_capabilities(struct sender_state *s);
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
+int32_t stream_capabilities_to_vn(uint32_t caps);
+
+#include "replication.h"
+
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index c4836aeaf..8e637d2bd 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
+#include "parser/parser.h"
#define WORKER_SENDER_JOB_CONNECT 0
#define WORKER_SENDER_JOB_PIPE_READ 1
@@ -17,9 +18,15 @@
#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
-
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 15
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 15
+#define WORKER_SENDER_JOB_BUFFER_RATIO 15
+#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
+#define WORKER_SENDER_JOB_BYTES_SENT 17
+#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
+#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
+#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
#endif
extern struct config stream_config;
@@ -27,10 +34,31 @@ extern int netdata_use_ssl_on_stream;
extern char *netdata_ssl_ca_path;
extern char *netdata_ssl_ca_file;
+static __thread BUFFER *sender_thread_buffer = NULL;
+static __thread bool sender_thread_buffer_used = false;
+
+void sender_thread_buffer_free(void) {
+ if(sender_thread_buffer) {
+ buffer_free(sender_thread_buffer);
+ sender_thread_buffer = NULL;
+ }
+}
+
// Collector thread starting a transmission
-void sender_start(struct sender_state *s) {
- netdata_mutex_lock(&s->mutex);
- buffer_flush(s->build);
+BUFFER *sender_start(struct sender_state *s __maybe_unused) {
+ if(!sender_thread_buffer)
+ sender_thread_buffer = buffer_create(1024);
+
+ if(sender_thread_buffer_used)
+ fatal("STREAMING: thread buffer is used multiple times concurrently.");
+
+ sender_thread_buffer_used = true;
+ buffer_flush(sender_thread_buffer);
+ return sender_thread_buffer;
+}
+
+void sender_cancel(struct sender_state *s __maybe_unused) {
+ sender_thread_buffer_used = false;
}
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
@@ -43,137 +71,218 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
*/
static inline void deactivate_compression(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
- error("STREAM_COMPRESSION: Deactivating compression to avoid stream corruption");
- default_compression_enabled = 0;
- s->rrdpush_compression = 0;
- s->version = STREAM_VERSION_CLABELS;
- error("STREAM_COMPRESSION %s [send to %s]: Restarting connection without compression", s->host->hostname, s->connected_to);
+ error("STREAM_COMPRESSION: Compression returned error, disabling it.");
+ s->flags &= ~SENDER_FLAG_COMPRESSION;
+ error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
rrdpush_sender_thread_close_socket(s->host);
}
#endif
+#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
+
// Collector thread finishing a transmission
-void sender_commit(struct sender_state *s) {
- char *src = (char *)buffer_tostring(s->host->sender->build);
- size_t src_len = s->host->sender->build->len;
+void sender_commit(struct sender_state *s, BUFFER *wb) {
+
+ if(unlikely(wb != sender_thread_buffer))
+ fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer.");
+
+ if(unlikely(!sender_thread_buffer_used))
+ fatal("STREAMING: sender is committing a buffer twice.");
+
+ sender_thread_buffer_used = false;
+
+ char *src = (char *)buffer_tostring(wb);
+ size_t src_len = buffer_strlen(wb);
+
+ if(unlikely(!src || !src_len))
+ return;
+
+ netdata_mutex_lock(&s->mutex);
+
+ if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
+ info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
+ rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
+
+ s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
+ }
+
#ifdef ENABLE_COMPRESSION
- if (src && src_len) {
- if (s->compressor && s->rrdpush_compression) {
- src_len = s->compressor->compress(s->compressor, src, src_len, &src);
- if (!src_len) {
- deactivate_compression(s);
- buffer_flush(s->build);
- netdata_mutex_unlock(&s->mutex);
- return;
+ if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) {
+ while(src_len) {
+ size_t size_to_compress = src_len;
+
+ if(unlikely(size_to_compress > COMPRESSION_MAX_MSG_SIZE)) {
+ if (stream_has_capability(s, STREAM_CAP_BINARY))
+ size_to_compress = COMPRESSION_MAX_MSG_SIZE;
+ else {
+ if (size_to_compress > COMPRESSION_MAX_MSG_SIZE) {
+ // we need to find the last newline
+ // so that the decompressor will have a whole line to work with
+
+ const char *t = &src[COMPRESSION_MAX_MSG_SIZE];
+ while (--t >= src)
+ if (unlikely(*t == '\n'))
+ break;
+
+ if (t <= src) {
+ size_to_compress = COMPRESSION_MAX_MSG_SIZE;
+ } else
+ size_to_compress = t - src + 1;
+ }
+ }
}
+
+ char *dst;
+ size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ if (!dst_len) {
+ error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
+ rrdhost_hostname(s->host), s->connected_to);
+
+ s->compressor->reset(s->compressor);
+ dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ if(!dst_len) {
+ error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
+ rrdhost_hostname(s->host), s->connected_to);
+
+ deactivate_compression(s);
+ netdata_mutex_unlock(&s->mutex);
+ return;
+ }
+ }
+
+ if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len))
+ s->flags |= SENDER_FLAG_OVERFLOW;
+
+ src = src + size_to_compress;
+ src_len -= size_to_compress;
}
- if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
- s->overflow = 1;
}
+ else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ s->flags |= SENDER_FLAG_OVERFLOW;
#else
if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
- s->overflow = 1;
+ s->flags |= SENDER_FLAG_OVERFLOW;
#endif
- buffer_flush(s->build);
- netdata_mutex_unlock(&s->mutex);
-}
+ replication_recalculate_buffer_used_ratio_unsafe(s);
-static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
- __atomic_clear(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
-
- if(host->rrdpush_sender_socket != -1) {
- close(host->rrdpush_sender_socket);
- host->rrdpush_sender_socket = -1;
- }
+ netdata_mutex_unlock(&s->mutex);
+ rrdpush_signal_sender_to_wake_up(s);
}
-static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) {
- NETDATA_DOUBLE *value = (NETDATA_DOUBLE *)rv->value;
-
+static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
buffer_sprintf(
- host->sender->build
+ wb
, "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
- , rv->name
- , *value
+ , rrdvar_name(rva)
+ , rrdvar2number(rva)
);
- debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rv->name, *value);
+ debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
}
-void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) {
- if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && __atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)) {
- sender_start(host->sender);
- rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
- sender_commit(host->sender);
+void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
+ if(rrdhost_can_send_definitions_to_parent(host)) {
+ BUFFER *wb = sender_start(host->sender);
+ rrdpush_sender_add_host_variable_to_buffer(wb, rva);
+ sender_commit(host->sender, wb);
}
}
+struct custom_host_variables_callback {
+ BUFFER *wb;
+};
-static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr, void *host_ptr) {
- RRDVAR *rv = (RRDVAR *)rrdvar_ptr;
- RRDHOST *host = (RRDHOST *)host_ptr;
+static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *struct_ptr) {
+ const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item;
+ struct custom_host_variables_callback *tmp = struct_ptr;
+ BUFFER *wb = tmp->wb;
- if(unlikely(rv->options & RRDVAR_OPTION_CUSTOM_HOST_VAR && rv->type == RRDVAR_TYPE_CALCULATED)) {
- rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
-
- // return 1, so that the traversal will return the number of variables sent
+ if(unlikely(rrdvar_flags(rv) & RRDVAR_FLAG_CUSTOM_HOST_VAR && rrdvar_type(rv) == RRDVAR_TYPE_CALCULATED)) {
+ rrdpush_sender_add_host_variable_to_buffer(wb, rv);
return 1;
}
-
- // returning a negative number will break the traversal
return 0;
}
static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
- sender_start(host->sender);
- int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host);
- (void)ret;
- sender_commit(host->sender);
-
- debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+ if(rrdhost_can_send_definitions_to_parent(host)) {
+ BUFFER *wb = sender_start(host->sender);
+ struct custom_host_variables_callback tmp = {
+ .wb = wb
+ };
+ int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
+ (void)ret;
+ sender_commit(host->sender, wb);
+
+ debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
+ }
}
// resets all the chart, so that their definitions
// will be resent to the central netdata
static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
- rrdhost_rdlock(host);
+ error("Clearing stream_collected_metrics flag in charts of host %s", rrdhost_hostname(host));
RRDSET *st;
rrdset_foreach_read(st, host) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
st->upstream_resync_time = 0;
- rrdset_rdlock(st);
-
RRDDIM *rd;
rrddim_foreach_read(rd, st)
rd->exposed = 0;
-
- rrdset_unlock(st);
+ rrddim_foreach_done(rd);
}
+ rrdset_foreach_done(st);
- rrdhost_unlock(host);
+ rrdhost_sender_replicating_charts_zero(host);
}
-static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
+static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
+ rrdpush_sender_set_flush_time(host->sender);
+
netdata_mutex_lock(&host->sender->mutex);
- size_t len = cbuffer_next_unsafe(host->sender->buffer, NULL);
- if (len)
- error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, len);
+ // flush the output buffer from any data it may have
+ cbuffer_flush(host->sender->buffer);
+ replication_recalculate_buffer_used_ratio_unsafe(host->sender);
- cbuffer_remove_unsafe(host->sender->buffer, len);
netdata_mutex_unlock(&host->sender->mutex);
+}
+
+static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
+ rrdpush_sender_set_flush_time(host->sender);
+
+ // stop all replication commands inflight
+ replication_sender_delete_pending_requests(host->sender);
+ // reset the state of all charts
rrdpush_sender_thread_reset_all_charts(host);
+
+ rrdpush_sender_replicating_charts_zero(host->sender);
+}
+
+static void rrdpush_sender_on_connect(RRDHOST *host) {
+ rrdpush_sender_cbuffer_flush(host);
+ rrdpush_sender_charts_and_replication_reset(host);
rrdpush_sender_thread_send_custom_host_variables(host);
}
-static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) {
- rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
- rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_STOP);
+static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
+ if(host->sender->rrdpush_sender_socket != -1) {
+ close(host->sender->rrdpush_sender_socket);
+ host->sender->rrdpush_sender_socket = -1;
+ }
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+
+ // do not flush the circular buffer here
+ // this function is called sometimes with the mutex lock, sometimes without the lock
+ rrdpush_sender_charts_and_replication_reset(host);
}
void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
@@ -203,52 +312,123 @@ void rrdpush_clean_encoded(stream_encoded_t *se)
freez(se->kernel_version);
}
-static inline long int parse_stream_version_for_errors(char *http)
-{
- if (!memcmp(http, START_STREAMING_ERROR_SAME_LOCALHOST, sizeof(START_STREAMING_ERROR_SAME_LOCALHOST)))
- return -2;
- else if (!memcmp(http, START_STREAMING_ERROR_ALREADY_STREAMING, sizeof(START_STREAMING_ERROR_ALREADY_STREAMING)))
- return -3;
- else if (!memcmp(http, START_STREAMING_ERROR_NOT_PERMITTED, sizeof(START_STREAMING_ERROR_NOT_PERMITTED)))
- return -4;
- else
- return -1;
-}
+struct {
+ const char *response;
+ size_t length;
+ int32_t version;
+ bool dynamic;
+ const char *error;
+ int worker_job_id;
+ time_t postpone_reconnect_seconds;
+} stream_responses[] = {
+ {
+ .response = START_STREAMING_PROMPT_VN,
+ .length = sizeof(START_STREAMING_PROMPT_VN) - 1,
+ .version = STREAM_HANDSHAKE_OK_V3, // and above
+ .dynamic = true, // dynamic = we will parse the version / capabilities
+ .error = NULL,
+ .worker_job_id = 0,
+ .postpone_reconnect_seconds = 0,
+ },
+ {
+ .response = START_STREAMING_PROMPT_V2,
+ .length = sizeof(START_STREAMING_PROMPT_V2) - 1,
+ .version = STREAM_HANDSHAKE_OK_V2,
+ .dynamic = false,
+ .error = NULL,
+ .worker_job_id = 0,
+ .postpone_reconnect_seconds = 0,
+ },
+ {
+ .response = START_STREAMING_PROMPT_V1,
+ .length = sizeof(START_STREAMING_PROMPT_V1) - 1,
+ .version = STREAM_HANDSHAKE_OK_V1,
+ .dynamic = false,
+ .error = NULL,
+ .worker_job_id = 0,
+ .postpone_reconnect_seconds = 0,
+ },
+ {
+ .response = START_STREAMING_ERROR_SAME_LOCALHOST,
+ .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1,
+ .version = STREAM_HANDSHAKE_ERROR_LOCALHOST,
+ .dynamic = false,
+ .error = "remote server rejected this stream, the host we are trying to stream is its localhost",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
+ },
+ {
+ .response = START_STREAMING_ERROR_ALREADY_STREAMING,
+ .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1,
+ .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED,
+ .dynamic = false,
+ .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 2 * 60, // 2 minutes
+ },
+ {
+ .response = START_STREAMING_ERROR_NOT_PERMITTED,
+ .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1,
+ .version = STREAM_HANDSHAKE_ERROR_DENIED,
+ .dynamic = false,
+ .error = "remote server denied access, probably we don't have the right API key?",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 1 * 60, // 1 minute
+ },
+
+ // terminator
+ {
+ .response = NULL,
+ .length = 0,
+ .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE,
+ .dynamic = false,
+ .error = "remote node response is not understood, is it Netdata?",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 1 * 60, // 1 minute
+ }
+};
-static inline long int parse_stream_version(RRDHOST *host, char *http)
-{
- long int stream_version = -1;
- int answer = -1;
- char *stream_version_start = strchr(http, '=');
- if (stream_version_start) {
- stream_version_start++;
- stream_version = strtol(stream_version_start, NULL, 10);
- answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(stream_version_start - http));
- if (!answer) {
- rrdpush_set_flags_to_newest_stream(host);
+static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender_state *s, char *http, size_t http_length) {
+ int32_t version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE;
+
+ int i;
+ for(i = 0; stream_responses[i].response ; i++) {
+ if(stream_responses[i].dynamic &&
+ http_length > stream_responses[i].length && http_length < (stream_responses[i].length + 30) &&
+ strncmp(http, stream_responses[i].response, stream_responses[i].length) == 0) {
+
+ version = str2i(&http[stream_responses[i].length]);
+ break;
}
- } else {
- answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
- if (!answer) {
- stream_version = 1;
- rrdpush_set_flags_to_newest_stream(host);
- } else {
- answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
- if (!answer) {
- stream_version = 0;
- rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_STOP);
- rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
- }
- else {
- stream_version = parse_stream_version_for_errors(http);
- }
+ else if(http_length == stream_responses[i].length && strcmp(http, stream_responses[i].response) == 0) {
+ version = stream_responses[i].version;
+
+ break;
}
}
- return stream_version;
+ const char *error = stream_responses[i].error;
+ int worker_job_id = stream_responses[i].worker_job_id;
+ time_t delay = stream_responses[i].postpone_reconnect_seconds;
+
+ if(version >= STREAM_HANDSHAKE_OK_V1) {
+ host->destination->last_error = NULL;
+ host->destination->last_handshake = version;
+ host->destination->postpone_reconnection_until = 0;
+ s->capabilities = convert_stream_version_to_capabilities(version);
+ return true;
+ }
+
+ error("STREAM %s [send to %s]: %s.", rrdhost_hostname(host), s->connected_to, error);
+
+ worker_is_busy(worker_job_id);
+ rrdpush_sender_thread_close_socket(host);
+ host->destination->last_error = error;
+ host->destination->last_handshake = version;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
+ return false;
}
-static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout,
- struct sender_state *s) {
+static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
struct timeval tv = {
.tv_sec = timeout,
@@ -258,11 +438,8 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
// make sure the socket is closed
rrdpush_sender_thread_close_socket(host);
- debug(D_STREAM, "STREAM: Attempting to connect...");
- info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination);
-
- host->rrdpush_sender_socket = connect_to_one_of_destinations(
- host->destinations
+ s->rrdpush_sender_socket = connect_to_one_of_destinations(
+ host
, default_port
, &tv
, &s->reconnects_counter
@@ -271,48 +448,50 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
, &host->destination
);
- if(unlikely(host->rrdpush_sender_socket == -1)) {
- error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_send_destination);
- return 0;
+ if(unlikely(s->rrdpush_sender_socket == -1)) {
+ error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
+ return false;
}
- info("STREAM %s [send to %s]: initializing communication...", host->hostname, s->connected_to);
+ info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
#ifdef ENABLE_HTTPS
- if( netdata_client_ctx ){
- host->ssl.flags = NETDATA_SSL_START;
- if (!host->ssl.conn){
- host->ssl.conn = SSL_new(netdata_client_ctx);
- if(!host->ssl.conn){
+ if(netdata_ssl_client_ctx){
+ host->sender->ssl.flags = NETDATA_SSL_START;
+ if (!host->sender->ssl.conn){
+ host->sender->ssl.conn = SSL_new(netdata_ssl_client_ctx);
+ if(!host->sender->ssl.conn){
error("Failed to allocate SSL structure.");
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
}
}
else{
- SSL_clear(host->ssl.conn);
+ SSL_clear(host->sender->ssl.conn);
}
- if (host->ssl.conn)
+ if (host->sender->ssl.conn)
{
- if (SSL_set_fd(host->ssl.conn, host->rrdpush_sender_socket) != 1) {
- error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket);
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ if (SSL_set_fd(host->sender->ssl.conn, s->rrdpush_sender_socket) != 1) {
+ error("Failed to set the socket to the SSL on socket fd %d.", s->rrdpush_sender_socket);
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
} else{
- host->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
+ host->sender->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
}
}
}
else {
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
}
#endif
+ // reset our capabilities to default
+ s->capabilities = STREAM_OUR_CAPABILITIES;
+
#ifdef ENABLE_COMPRESSION
-// Negotiate stream VERSION_CLABELS if stream compression is not supported
-s->rrdpush_compression = (default_compression_enabled && (s->version >= STREAM_VERSION_COMPRESSION));
-if(!s->rrdpush_compression)
- s->version = STREAM_VERSION_CLABELS;
-#endif //ENABLE_COMPRESSION
+ // If we don't want compression, remove it from our capabilities
+ if(!(s->flags & SENDER_FLAG_COMPRESSION))
+ s->capabilities &= ~STREAM_CAP_COMPRESSION;
+#endif // ENABLE_COMPRESSION
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
@@ -337,7 +516,7 @@ if(!s->rrdpush_compression)
"&ml_enabled=%d"
"&mc_version=%d"
"&tags=%s"
- "&ver=%d"
+ "&ver=%u"
"&NETDATA_INSTANCE_CLOUD_TYPE=%s"
"&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s"
"&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s"
@@ -370,20 +549,20 @@ if(!s->rrdpush_compression)
"User-Agent: %s/%s\r\n"
"Accept: */*\r\n\r\n"
, host->rrdpush_send_api_key
- , host->hostname
- , host->registry_hostname
+ , rrdhost_hostname(host)
+ , rrdhost_registry_hostname(host)
, host->machine_guid
, default_rrd_update_every
- , host->os
- , host->timezone
- , host->abbrev_timezone
+ , rrdhost_os(host)
+ , rrdhost_timezone(host)
+ , rrdhost_abbrev_timezone(host)
, host->utc_offset
, host->system_info->hops + 1
, host->system_info->ml_capable
, host->system_info->ml_enabled
, host->system_info->mc_version
- , (host->tags) ? host->tags : ""
- , s->version
+ , rrdhost_tags(host)
+ , s->capabilities
, (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
, (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
, (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : ""
@@ -412,146 +591,128 @@ if(!s->rrdpush_compression)
, (host->system_info->host_ram_total) ? host->system_info->host_ram_total : ""
, (host->system_info->host_disk_space) ? host->system_info->host_disk_space : ""
, STREAMING_PROTOCOL_VERSION
- , host->program_name
- , host->program_version
+ , rrdhost_program_name(host)
+ , rrdhost_program_version(host)
);
http[eol] = 0x00;
rrdpush_clean_encoded(&se);
#ifdef ENABLE_HTTPS
- if (!host->ssl.flags) {
+ if (!host->sender->ssl.flags) {
ERR_clear_error();
- SSL_set_connect_state(host->ssl.conn);
- int err = SSL_connect(host->ssl.conn);
+ SSL_set_connect_state(host->sender->ssl.conn);
+ int err = SSL_connect(host->sender->ssl.conn);
if (err != 1){
- err = SSL_get_error(host->ssl.conn, err);
- error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->ssl.conn,err),NULL));
+ err = SSL_get_error(host->sender->ssl.conn, err);
+ error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->sender->ssl.conn,err),NULL));
if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
rrdpush_sender_thread_close_socket(host);
- if (host->destination->next)
- host->destination->disabled_no_proper_reply = 1;
- return 0;
- }else {
- host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
+ host->destination->last_error = "SSL error";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
+ return false;
+ }
+ else {
+ host->sender->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
}
}
else {
if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
- if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
- if ( security_test_certificate(host->ssl.conn)) {
+ if (netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
+ if ( security_test_certificate(host->sender->ssl.conn)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
error("Closing the stream connection, because the server SSL certificate is not valid.");
rrdpush_sender_thread_close_socket(host);
- if (host->destination->next)
- host->destination->disabled_no_proper_reply = 1;
- return 0;
+ host->destination->last_error = "invalid SSL certificate";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
+ return false;
}
}
}
}
}
- if(send_timeout(&host->ssl,host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
-#else
- if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
#endif
+
+ ssize_t bytes;
+
+ bytes = send_timeout(
+#ifdef ENABLE_HTTPS
+ &host->sender->ssl,
+#endif
+ s->rrdpush_sender_socket,
+ http,
+ strlen(http),
+ 0,
+ timeout);
+
+ if(bytes <= 0) { // timeout is 0
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
- error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", host->hostname, s->connected_to);
rrdpush_sender_thread_close_socket(host);
- return 0;
+ error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
+ host->destination->last_error = "timeout while sending request";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
+ return false;
}
- info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, s->connected_to);
+ info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to);
- ssize_t received;
+ bytes = recv_timeout(
#ifdef ENABLE_HTTPS
- received = recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout);
- if(received == -1) {
-#else
- received = recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout);
- if(received == -1) {
+ &host->sender->ssl,
#endif
+ s->rrdpush_sender_socket,
+ http,
+ HTTP_HEADER_SIZE,
+ 0,
+ timeout);
+
+ if(bytes <= 0) { // timeout is 0
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
- error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, s->connected_to);
rrdpush_sender_thread_close_socket(host);
- return 0;
+ error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
+ host->destination->last_error = "timeout while expecting first response";
+ host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
+ host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
+ return false;
}
- http[received] = '\0';
+ http[bytes] = '\0';
debug(D_STREAM, "Response to sender from far end: %s", http);
- int32_t version = (int32_t)parse_stream_version(host, http);
- if(version == -1) {
- worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE);
- error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, s->connected_to);
- rrdpush_sender_thread_close_socket(host);
- //catch other reject reasons and force to check other destinations
- if (host->destination->next)
- host->destination->disabled_no_proper_reply = 1;
- return 0;
- }
- else if(version == -2) {
- error("STREAM %s [send to %s]: remote server is the localhost for [%s].", host->hostname, s->connected_to, host->hostname);
- rrdpush_sender_thread_close_socket(host);
- host->destination->disabled_because_of_localhost = 1;
- return 0;
- }
- else if(version == -3) {
- error("STREAM %s [send to %s]: remote server already receives metrics for [%s].", host->hostname, s->connected_to, host->hostname);
- rrdpush_sender_thread_close_socket(host);
- host->destination->disabled_already_streaming = now_realtime_sec();
- return 0;
- }
- else if(version == -4) {
- error("STREAM %s [send to %s]: remote server denied access for [%s].", host->hostname, s->connected_to, host->hostname);
- rrdpush_sender_thread_close_socket(host);
- if (host->destination->next)
- host->destination->disabled_because_of_denied_access = 1;
- return 0;
- }
- s->version = version;
+ if(!rrdpush_sender_validate_response(host, s, http, bytes))
+ return false;
#ifdef ENABLE_COMPRESSION
- s->rrdpush_compression = (s->rrdpush_compression && (s->version >= STREAM_VERSION_COMPRESSION));
- if(s->rrdpush_compression)
- {
- // parent supports compression
- if(s->compressor)
+ if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) {
+ if(!s->compressor)
+ s->compressor = create_compressor();
+ else
s->compressor->reset(s->compressor);
}
- else {
- //parent does not support compression or has compression disabled
- debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
- infoerr("Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
- s->version = STREAM_VERSION_CLABELS;
- }
#endif //ENABLE_COMPRESSION
+ log_sender_capabilities(s);
- info("STREAM %s [send to %s]: established communication with a parent using protocol version %d - ready to send metrics..."
- , host->hostname
- , s->connected_to
- , s->version);
-
- if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, s->connected_to);
+ if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
- if(sock_enlarge_out(host->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, s->connected_to);
+ if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
+ error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
- debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_sender_socket);
+ debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
- return 1;
+ return true;
}
-static void attempt_to_connect(struct sender_state *state)
+static bool attempt_to_connect(struct sender_state *state)
{
state->send_attempts = 0;
if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
- state->last_sent_t = now_monotonic_sec();
-
// reset the buffer, to properly send charts and metrics
- rrdpush_sender_thread_data_flush(state->host);
+ rrdpush_sender_on_connect(state->host);
// send from the beginning
state->begin = 0;
@@ -563,372 +724,628 @@ static void attempt_to_connect(struct sender_state *state)
state->sent_bytes_on_this_connection = 0;
// let the data collection threads know we are ready
- __atomic_test_and_set(&state->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
+ rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+
+ return true;
}
- else {
- // increase the failed connections counter
- state->not_connected_loops++;
- // reset the number of bytes sent
- state->sent_bytes_on_this_connection = 0;
+ // we couldn't connect
- // slow re-connection on repeating errors
- sleep_usec(USEC_PER_SEC * state->reconnect_delay); // seconds
- }
+ // increase the failed connections counter
+ state->not_connected_loops++;
+
+ // reset the number of bytes sent
+ state->sent_bytes_on_this_connection = 0;
+
+ // slow re-connection on repeating errors
+ sleep_usec(USEC_PER_SEC * state->reconnect_delay); // seconds
+
+ return false;
}
// TCP window is open and we have data to transmit.
-void attempt_to_send(struct sender_state *s) {
-
- rrdpush_send_labels(s->host);
+static ssize_t attempt_to_send(struct sender_state *s) {
+ ssize_t ret = 0;
#ifdef NETDATA_INTERNAL_CHECKS
struct circular_buffer *cb = s->buffer;
#endif
- netdata_thread_disable_cancelability();
netdata_mutex_lock(&s->mutex);
char *chunk;
size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
- ssize_t ret;
+
#ifdef ENABLE_HTTPS
- SSL *conn = s->host->ssl.conn ;
- if(conn && !s->host->ssl.flags) {
- ret = SSL_write(conn, chunk, outstanding);
- } else {
- ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
- }
+ SSL *conn = s->host->sender->ssl.conn ;
+ if(conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ ret = netdata_ssl_write(conn, chunk, outstanding);
+ else
+ ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
#else
- ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
+ ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
#endif
+
if (likely(ret > 0)) {
cbuffer_remove_unsafe(s->buffer, ret);
s->sent_bytes_on_this_connection += ret;
s->sent_bytes += ret;
- debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", s->host->hostname, s->connected_to, ret);
- s->last_sent_t = now_monotonic_sec();
+ debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
}
else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
- debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", s->host->hostname, s->connected_to);
+ debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
else if (ret == -1) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR);
debug(D_STREAM, "STREAM: Send failed - closing socket...");
- error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", s->host->hostname, s->connected_to, s->sent_bytes_on_this_connection);
+ error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
- else {
+ else
debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
- }
+ replication_recalculate_buffer_used_ratio_unsafe(s);
netdata_mutex_unlock(&s->mutex);
- netdata_thread_enable_cancelability();
+
+ return ret;
}
-void attempt_read(struct sender_state *s) {
-int ret;
+static ssize_t attempt_read(struct sender_state *s) {
+ ssize_t ret = 0;
+
#ifdef ENABLE_HTTPS
- if (s->host->ssl.conn && !s->host->stream_ssl.flags) {
- ERR_clear_error();
- int desired = sizeof(s->read_buffer) - s->read_len - 1;
- ret = SSL_read(s->host->ssl.conn, s->read_buffer, desired);
+ if (s->host->sender->ssl.conn && s->host->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
+ size_t desired = sizeof(s->read_buffer) - s->read_len - 1;
+ ret = netdata_ssl_read(s->host->sender->ssl.conn, s->read_buffer, desired);
if (ret > 0 ) {
- s->read_len += ret;
- return;
+ s->read_len += (int)ret;
+ return ret;
}
- int sslerrno = SSL_get_error(s->host->ssl.conn, desired);
- if (sslerrno == SSL_ERROR_WANT_READ || sslerrno == SSL_ERROR_WANT_WRITE)
- return;
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
- u_long err;
- char buf[256];
- while ((err = ERR_get_error()) != 0) {
- ERR_error_string_n(err, buf, sizeof(buf));
- error("STREAM %s [send to %s] ssl error: %s", s->host->hostname, s->connected_to, buf);
- }
- error("Restarting connection");
rrdpush_sender_thread_close_socket(s->host);
- return;
+ return ret;
}
#endif
- ret = recv(s->host->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
- if (ret>0) {
+ ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
+ if (ret > 0) {
s->read_len += ret;
- return;
+ return ret;
}
- debug(D_STREAM, "Socket was POLLIN, but req %zu bytes gave %d", sizeof(s->read_buffer) - s->read_len - 1, ret);
-
- if (ret<0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
- return;
+ if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
+ return ret;
- if (ret==0) {
+ if (ret == 0 || errno == ECONNRESET) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
- error("STREAM %s [send to %s]: connection closed by far end. Restarting connection", s->host->hostname, s->connected_to);
+ error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
}
else {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
- error("STREAM %s [send to %s]: error during receive (%d). Restarting connection", s->host->hostname, s->connected_to, ret);
+ error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
}
rrdpush_sender_thread_close_socket(s->host);
+
+ return ret;
+}
+
+struct inflight_stream_function {
+ struct sender_state *sender;
+ STRING *transaction;
+ usec_t received_ut;
+};
+
+void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
+ struct inflight_stream_function *tmp = data;
+
+ struct sender_state *s = tmp->sender;
+
+ if(rrdhost_can_send_definitions_to_parent(s->host)) {
+ BUFFER *wb = sender_start(s);
+
+ pluginsd_function_result_begin_to_buffer(wb
+ , string2str(tmp->transaction)
+ , code
+ , functions_content_type_to_format(func_wb->contenttype)
+ , func_wb->expires);
+
+ buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
+ pluginsd_function_result_end_to_buffer(wb);
+
+ sender_commit(s, wb);
+
+ internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).",
+ rrdhost_hostname(s->host), s->connected_to,
+ string2str(tmp->transaction),
+ buffer_strlen(func_wb),
+ now_realtime_usec() - tmp->received_ut);
+ }
+ string_freez(tmp->transaction);
+ buffer_free(func_wb);
+ freez(tmp);
}
// This is just a placeholder until the gap filling state machine is inserted
void execute_commands(struct sender_state *s) {
+ worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
+
char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
*end = 0;
- while( start<end && (newline=strchr(start, '\n')) ) {
- *newline = 0;
- info("STREAM %s [send to %s] received command over connection: %s", s->host->hostname, s->connected_to, start);
- start = newline+1;
+ while( start < end && (newline = strchr(start, '\n')) ) {
+ *newline = '\0';
+
+ log_access("STREAM: %d from '%s' for host '%s': %s",
+ gettid(), s->connected_to, rrdhost_hostname(s->host), start);
+
+ internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
+
+ char *words[PLUGINSD_MAX_WORDS] = { NULL };
+ size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+
+ const char *keyword = get_word(words, num_words, 0);
+
+ if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
+
+ if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+ error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ rrdhost_hostname(s->host), s->connected_to,
+ keyword,
+ transaction?transaction:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ function?function:"(unset)");
+ }
+ else {
+ int timeout = str2i(timeout_s);
+ if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
+
+ struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function));
+ tmp->received_ut = now_realtime_usec();
+ tmp->sender = s;
+ tmp->transaction = string_strdupz(transaction);
+ BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1);
+
+ int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp);
+ if(code != HTTP_RESP_OK) {
+ rrd_call_function_error(wb, "Failed to route request to collector", code);
+ stream_execute_function_callback(wb, code, tmp);
+ }
+ }
+ }
+ else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
+
+ const char *chart_id = get_word(words, num_words, 1);
+ const char *start_streaming = get_word(words, num_words, 2);
+ const char *after = get_word(words, num_words, 3);
+ const char *before = get_word(words, num_words, 4);
+
+ if (!chart_id || !start_streaming || !after || !before) {
+ error("STREAM %s [send to %s] %s command is incomplete"
+ " (chart=%s, start_streaming=%s, after=%s, before=%s)",
+ rrdhost_hostname(s->host), s->connected_to,
+ keyword,
+ chart_id ? chart_id : "(unset)",
+ start_streaming ? start_streaming : "(unset)",
+ after ? after : "(unset)",
+ before ? before : "(unset)");
+ }
+ else {
+ replication_add_request(s, chart_id,
+ strtoll(after, NULL, 0),
+ strtoll(before, NULL, 0),
+ !strcmp(start_streaming, "true")
+ );
+ }
+ }
+ else {
+ error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
+ }
+
+ worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
+ start = newline + 1;
}
- if (start<end) {
+ if (start < end) {
memmove(s->read_buffer, start, end-start);
- s->read_len = end-start;
+ s->read_len = end - start;
+ }
+ else {
+ s->read_buffer[0] = '\0';
+ s->read_len = 0;
}
}
+struct rrdpush_sender_thread_data {
+ struct sender_state *sender_state;
+ RRDHOST *host;
+ char *pipe_buffer;
+};
-static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
- worker_unregister();
+static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) {
+ static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
- RRDHOST *host = (RRDHOST *)ptr;
+ bool ret = true;
- netdata_mutex_lock(&host->sender->mutex);
+ netdata_mutex_lock(&mutex);
- info("STREAM %s [send]: sending thread cleans up...", host->hostname);
+ int new_pipe_fds[2];
+ if(reopen) {
+ if(pipe(new_pipe_fds) != 0) {
+ error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host));
+ new_pipe_fds[PIPE_READ] = -1;
+ new_pipe_fds[PIPE_WRITE] = -1;
+ ret = false;
+ }
+ }
- rrdpush_sender_thread_close_socket(host);
+ int old_pipe_fds[2];
+ old_pipe_fds[PIPE_READ] = pipe_fds[PIPE_READ];
+ old_pipe_fds[PIPE_WRITE] = pipe_fds[PIPE_WRITE];
- // close the pipe
- if(host->rrdpush_sender_pipe[PIPE_READ] != -1) {
- close(host->rrdpush_sender_pipe[PIPE_READ]);
- host->rrdpush_sender_pipe[PIPE_READ] = -1;
+ if(reopen) {
+ pipe_fds[PIPE_READ] = new_pipe_fds[PIPE_READ];
+ pipe_fds[PIPE_WRITE] = new_pipe_fds[PIPE_WRITE];
+ }
+ else {
+ pipe_fds[PIPE_READ] = -1;
+ pipe_fds[PIPE_WRITE] = -1;
}
- if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1) {
- close(host->rrdpush_sender_pipe[PIPE_WRITE]);
- host->rrdpush_sender_pipe[PIPE_WRITE] = -1;
+ if(old_pipe_fds[PIPE_READ] > 2)
+ close(old_pipe_fds[PIPE_READ]);
+
+ if(old_pipe_fds[PIPE_WRITE] > 2)
+ close(old_pipe_fds[PIPE_WRITE]);
+
+ netdata_mutex_unlock(&mutex);
+ return ret;
+}
+
+void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
+ if(unlikely(s->tid == gettid()))
+ return;
+
+ RRDHOST *host = s->host;
+
+ int pipe_fd = s->rrdpush_sender_pipe[PIPE_WRITE];
+
+ // signal the sender there are more data
+ if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) {
+ error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host));
+ rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true);
}
+}
+
+static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
+ struct rrdpush_sender_thread_data *data = ptr;
+ worker_unregister();
+
+ RRDHOST *host = data->host;
- if(!host->rrdpush_sender_join) {
- info("STREAM %s [send]: sending thread detaches itself.", host->hostname);
+ netdata_mutex_lock(&host->sender->mutex);
+
+ info("STREAM %s [send]: sending thread cleans up...", rrdhost_hostname(host));
+
+ rrdpush_sender_thread_close_socket(host);
+ rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
+
+ if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN)) {
+ info("STREAM %s [send]: sending thread detaches itself.", rrdhost_hostname(host));
netdata_thread_detach(netdata_thread_self());
}
- host->rrdpush_sender_spawn = 0;
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
- info("STREAM %s [send]: sending thread now exits.", host->hostname);
+ info("STREAM %s [send]: sending thread now exits.", rrdhost_hostname(host));
netdata_mutex_unlock(&host->sender->mutex);
+
+ freez(data->pipe_buffer);
+ freez(data);
}
-void sender_init(RRDHOST *parent)
+void sender_init(RRDHOST *host)
{
- if (parent->sender)
+ if (host->sender)
return;
- parent->sender = callocz(1, sizeof(*parent->sender));
- parent->sender->host = parent;
- parent->sender->buffer = cbuffer_new(1024, 1024*1024);
- parent->sender->build = buffer_create(1);
+ host->sender = callocz(1, sizeof(*host->sender));
+ host->sender->host = host;
+ host->sender->buffer = cbuffer_new(1024, 1024 * 1024);
+ host->sender->capabilities = STREAM_OUR_CAPABILITIES;
+
+ host->sender->rrdpush_sender_pipe[PIPE_READ] = -1;
+ host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1;
+ host->sender->rrdpush_sender_socket = -1;
+
#ifdef ENABLE_COMPRESSION
- parent->sender->rrdpush_compression = default_compression_enabled;
- if (default_compression_enabled)
- parent->sender->compressor = create_compressor();
+ if(default_compression_enabled) {
+ host->sender->flags |= SENDER_FLAG_COMPRESSION;
+ host->sender->compressor = create_compressor();
+ }
+ else
+ host->sender->flags &= ~SENDER_FLAG_COMPRESSION;
#endif
- netdata_mutex_init(&parent->sender->mutex);
+
+ netdata_mutex_init(&host->sender->mutex);
+ replication_init_sender(host->sender);
}
void *rrdpush_sender_thread(void *ptr) {
+ worker_register("STREAMSND");
+ worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
+ worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read");
+ worker_register_job_name(WORKER_SENDER_JOB_SOCKET_RECEIVE, "receive");
+ worker_register_job_name(WORKER_SENDER_JOB_EXECUTE, "execute");
+ worker_register_job_name(WORKER_SENDER_JOB_SOCKET_SEND, "send");
+
+ // disconnection reasons
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR, "disconnect receive error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
+
+ worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
+ worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function");
+
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
+
struct sender_state *s = ptr;
- s->task_id = gettid();
+ s->tid = gettid();
- if(!s->host->rrdpush_send_enabled || !s->host->rrdpush_send_destination ||
+ if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
!*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
!*s->host->rrdpush_send_api_key) {
error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
- s->host->hostname, s->task_id);
+ rrdhost_hostname(s->host), s->tid);
return NULL;
}
#ifdef ENABLE_HTTPS
if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ){
security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING);
- security_location_for_context(netdata_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
+ ssl_security_location_for_context(netdata_ssl_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
}
#endif
- info("STREAM %s [send]: thread created (task id %d)", s->host->hostname, s->task_id);
+ info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), s->tid);
+
+ s->timeout = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600);
+
+ s->default_port = (int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
+
+ s->buffer->max_size = (size_t)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024 * 10);
+
+ s->reconnect_delay = (unsigned int)appconfig_get_number(
+ &stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
- s->timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
- s->default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
- s->buffer->max_size =
- (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024 * 10);
- s->reconnect_delay =
- (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
remote_clock_resync_iterations = (unsigned int)appconfig_get_number(
&stream_config, CONFIG_SECTION_STREAM,
"initial clock resync iterations",
remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
// initialize rrdpush globals
- __atomic_clear(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
- if(pipe(s->host->rrdpush_sender_pipe) == -1) {
- error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname);
+ rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+
+ int pipe_buffer_size = 10 * 1024;
+#ifdef F_GETPIPE_SZ
+ pipe_buffer_size = fcntl(s->rrdpush_sender_pipe[PIPE_READ], F_GETPIPE_SZ);
+#endif
+ if(pipe_buffer_size < 10 * 1024)
+ pipe_buffer_size = 10 * 1024;
+
+ if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
+ error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
+ rrdhost_hostname(s->host));
return NULL;
}
- s->version = STREAMING_PROTOCOL_CURRENT_VERSION;
- enum {
- Collector,
- Socket
- };
- struct pollfd fds[2];
- fds[Collector].fd = s->host->rrdpush_sender_pipe[PIPE_READ];
- fds[Collector].events = POLLIN;
+ struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
+ thread_data->pipe_buffer = mallocz(pipe_buffer_size);
+ thread_data->sender_state = s;
+ thread_data->host = s->host;
- worker_register("STREAMSND");
- worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
- worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read");
- worker_register_job_name(WORKER_SENDER_JOB_SOCKET_RECEIVE, "receive");
- worker_register_job_name(WORKER_SENDER_JOB_EXECUTE, "execute");
- worker_register_job_name(WORKER_SENDER_JOB_SOCKET_SEND, "send");
+ // reset our cleanup flags
+ rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN);
- // disconnection reasons
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR, "disconnect receive error");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
- worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
+ netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
- netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, s->host);
- for(; s->host->rrdpush_send_enabled && !netdata_exit ;) {
+ for(; rrdhost_has_rrdpush_sender_enabled(s->host) && !netdata_exit ;) {
// check for outstanding cancellation requests
netdata_thread_testcancel();
// The connection attempt blocks (after which we use the socket in nonblocking)
- if(unlikely(s->host->rrdpush_sender_socket == -1)) {
+ if(unlikely(s->rrdpush_sender_socket == -1)) {
worker_is_busy(WORKER_SENDER_JOB_CONNECT);
- s->overflow = 0;
+ rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ s->flags &= ~SENDER_FLAG_OVERFLOW;
s->read_len = 0;
s->buffer->read = 0;
s->buffer->write = 0;
- attempt_to_connect(s);
- if (s->version >= VERSION_GAP_FILLING) {
- time_t now = now_realtime_sec();
- sender_start(s);
- buffer_sprintf(s->build, "TIMESTAMP %"PRId64"", (int64_t)now);
- sender_commit(s);
- }
+
+ if(unlikely(!attempt_to_connect(s)))
+ continue;
+
+ s->last_traffic_seen_t = now_monotonic_sec();
rrdpush_claimed_id(s->host);
+ rrdpush_send_host_labels(s->host);
+
+ rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
+
continue;
}
// If the TCP window never opened then something is wrong, restart connection
- if(unlikely(now_monotonic_sec() - s->last_sent_t > s->timeout)) {
+ if(unlikely(now_monotonic_sec() - s->last_traffic_seen_t > s->timeout &&
+ !rrdpush_sender_pending_replication_requests(s) &&
+ !rrdpush_sender_replicating_charts(s)
+ )) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
- error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", s->host->hostname, s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
+ error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
rrdpush_sender_thread_close_socket(s->host);
continue;
}
- worker_is_idle();
-
- // Wait until buffer opens in the socket or a rrdset_done_push wakes us
- fds[Collector].revents = 0;
- fds[Socket].revents = 0;
- fds[Socket].fd = s->host->rrdpush_sender_socket;
-
netdata_mutex_lock(&s->mutex);
- char *chunk;
- size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, &chunk);
- chunk = NULL; // Do not cache pointer outside of region - could be invalidated
+ size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, NULL);
+ size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
netdata_mutex_unlock(&s->mutex);
- if(outstanding) {
+
+ worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->host->sender->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->host->sender->buffer->max_size);
+
+ if(outstanding)
s->send_attempts++;
- fds[Socket].events = POLLIN | POLLOUT;
- }
- else {
- fds[Socket].events = POLLIN;
+
+ if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
+ if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
+ error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
+ rrdhost_hostname(s->host));
+ rrdpush_sender_thread_close_socket(s->host);
+ break;
+ }
}
- int retval = poll(fds, 2, 1000);
+ worker_is_idle();
+
+ // Wait until buffer opens in the socket or a rrdset_done_push wakes us
+ enum {
+ Collector = 0,
+ Socket = 1,
+ };
+ struct pollfd fds[2] = {
+ [Collector] = {
+ .fd = s->rrdpush_sender_pipe[PIPE_READ],
+ .events = POLLIN,
+ .revents = 0,
+ },
+ [Socket] = {
+ .fd = s->rrdpush_sender_socket,
+ .events = POLLIN | (outstanding ? POLLOUT : 0 ),
+ .revents = 0,
+ }
+ };
+ int poll_rc = poll(fds, 2, 1000);
+
debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
fds[Collector].revents, fds[Socket].revents, outstanding);
if(unlikely(netdata_exit)) break;
+ internal_error(fds[Collector].fd != s->rrdpush_sender_pipe[PIPE_READ],
+ "STREAM %s [send to %s]: pipe changed after poll().", rrdhost_hostname(s->host), s->connected_to);
+
+ internal_error(fds[Socket].fd != s->rrdpush_sender_socket,
+ "STREAM %s [send to %s]: socket changed after poll().", rrdhost_hostname(s->host), s->connected_to);
+
// Spurious wake-ups without error - loop again
- if (retval == 0 || ((retval == -1) && (errno == EAGAIN || errno == EINTR))) {
+ if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) {
debug(D_STREAM, "Spurious wakeup");
continue;
}
// Only errors from poll() are internal, but try restarting the connection
- if(unlikely(retval == -1)) {
+ if(unlikely(poll_rc == -1)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR);
- error("STREAM %s [send to %s]: failed to poll(). Closing socket.", s->host->hostname, s->connected_to);
+ error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
+ rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
rrdpush_sender_thread_close_socket(s->host);
continue;
}
+ // If we have data and have seen the TCP window open then try to close it by a transmission.
+ if(likely(outstanding && (fds[Socket].revents & POLLOUT))) {
+ worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND);
+ ssize_t bytes = attempt_to_send(s);
+ if(bytes > 0) {
+ s->last_traffic_seen_t = now_monotonic_sec();
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, (NETDATA_DOUBLE)bytes);
+ }
+ }
+
// If the collector woke us up then empty the pipe to remove the signal
- if (fds[Collector].revents & POLLIN || fds[Collector].revents & POLLPRI) {
+ if (fds[Collector].revents & (POLLIN|POLLPRI)) {
worker_is_busy(WORKER_SENDER_JOB_PIPE_READ);
debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
- char buffer[1000 + 1];
- if (read(s->host->rrdpush_sender_pipe[PIPE_READ], buffer, 1000) == -1)
- error("STREAM %s [send to %s]: cannot read from internal pipe.", s->host->hostname, s->connected_to);
+ if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1)
+ error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
}
// Read as much as possible to fill the buffer, split into full lines for execution.
if (fds[Socket].revents & POLLIN) {
worker_is_busy(WORKER_SENDER_JOB_SOCKET_RECEIVE);
- attempt_read(s);
+ ssize_t bytes = attempt_read(s);
+ if(bytes > 0) {
+ s->last_traffic_seen_t = now_monotonic_sec();
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, (NETDATA_DOUBLE)bytes);
+ }
}
- worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
- execute_commands(s);
+ if(unlikely(s->read_len))
+ execute_commands(s);
- // If we have data and have seen the TCP window open then try to close it by a transmission.
- if (outstanding && fds[Socket].revents & POLLOUT) {
- worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND);
- attempt_to_send(s);
+ if(unlikely(fds[Collector].revents & (POLLERR|POLLHUP|POLLNVAL))) {
+ char *error = NULL;
+
+ if (unlikely(fds[Collector].revents & POLLERR))
+ error = "pipe reports errors (POLLERR)";
+ else if (unlikely(fds[Collector].revents & POLLHUP))
+ error = "pipe closed (POLLHUP)";
+ else if (unlikely(fds[Collector].revents & POLLNVAL))
+ error = "pipe is invalid (POLLNVAL)";
+
+ if(error) {
+ rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
+ error("STREAM %s [send to %s]: restarting internal pipe: %s.",
+ rrdhost_hostname(s->host), s->connected_to, error);
+ }
}
- // TODO-GAPS - why do we only check this on the socket, not the pipe?
- if (outstanding) {
+ if(unlikely(fds[Socket].revents & (POLLERR|POLLHUP|POLLNVAL))) {
char *error = NULL;
+
if (unlikely(fds[Socket].revents & POLLERR))
error = "socket reports errors (POLLERR)";
else if (unlikely(fds[Socket].revents & POLLHUP))
error = "connection closed by remote end (POLLHUP)";
else if (unlikely(fds[Socket].revents & POLLNVAL))
error = "connection is invalid (POLLNVAL)";
+
if(unlikely(error)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR);
- error("STREAM %s [send to %s]: restart stream because %s - %zu bytes transmitted.", s->host->hostname,
- s->connected_to, error, s->sent_bytes_on_this_connection);
+ error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
+ rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
}
// protection from overflow
- if (s->overflow) {
+ if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW);
errno = 0;
- error("STREAM %s [send to %s]: buffer full (%zu-bytes) after %zu bytes. Restarting connection",
- s->host->hostname, s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
+ error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
+ rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
+
+ worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication.requests));
}
netdata_thread_cleanup_pop(1);
diff --git a/streaming/stream.conf b/streaming/stream.conf
index 33172bbcb..7c9ccc9b8 100644
--- a/streaming/stream.conf
+++ b/streaming/stream.conf
@@ -33,36 +33,31 @@
destination =
# Skip Certificate verification?
- #
# The netdata child is configurated to avoid invalid SSL/TLS certificate,
# so certificates that are self-signed or expired will stop the streaming.
# Case the server certificate is not valid, you can enable the use of
# 'bad' certificates setting the next option as 'yes'.
- #
#ssl skip certificate verification = yes
# Certificate Authority Path
+ # OpenSSL has a default directory where the known certificates are stored.
+ # In case it is necessary, it is possible to change this rule using the variable
+ # "CApath", e.g. CApath = /etc/ssl/certs/
#
- # OpenSSL has a default directory where the known certificates are stored,
- # case it is necessary it is possible to change this rule using the variable
- # "CApath"
- #
- #CApath = /etc/ssl/certs/
+ #CApath =
# Certificate Authority file
+ # When the Netdata parent has a certificate that is not recognized as valid,
+ # we can add it to the list of known certificates in "CApath" and give it to
+ # Netdata as an argument, e.g. CAfile = /etc/ssl/certs/cert.pem
#
- # When the Netdata parent has certificate, that is not recognized as valid,
- # we can add this certificate in the list of known certificates in CApath
- # and give for Netdata as argument.
- #
- #CAfile = /etc/ssl/certs/cert.pem
+ #CAfile =
# The API_KEY to use (as the sender)
api key =
# Stream Compression
- #
- # The netdata child is configurated to enable stream compression by default.
+ # The default is enabled
# You can control stream compression in this agent with options: yes | no
#enable compression = yes
@@ -91,6 +86,7 @@
reconnect delay seconds = 5
# Sync the clock of the charts for that many iterations, when starting.
+ # It is ignored when replication is enabled
initial clock resync iterations = 60
# -----------------------------------------------------------------------------
@@ -115,6 +111,11 @@
[API_KEY]
# Default settings for this API key
+ # This GUID is to be used as an API key from remote agents connecting
+ # to this machine. Failure to match such a key, denies access.
+ # YOU MUST SET THIS FIELD ON ALL API KEYS.
+ type = api
+
# You can disable the API key, by setting this to: no
# The default (for unknown API keys) is: no
enabled = no
@@ -127,9 +128,8 @@
# The default history in entries, for all hosts using this API key.
# You can also set it per host below.
- # If you don't set it here, the history size of the central netdata
- # will be used.
- default history = 3600
+ # For the default db mode (dbengine), this is ignored.
+ #default history = 3600
# The default memory mode to be used for all hosts using this API key.
# You can also set it per host below.
@@ -140,7 +140,7 @@
# ram keep it in RAM, don't touch the disk
# none no database at all (use this on headless proxies)
# dbengine like a traditional database
- default memory mode = ram
+ #default memory mode = dbengine
# Shall we enable health monitoring for the hosts using this API key?
# 3 possible values:
@@ -150,7 +150,7 @@
# ensure that the netdata process on the child is gracefully stopped, to prevent invalid last_collected alarms
# You can also set it per host, below.
# The default is taken from [health].enabled of netdata.conf
- health enabled by default = auto
+ #health enabled by default = auto
# postpone alarms for a short period after the sender is connected
default postpone alarms on connect seconds = 60
@@ -163,11 +163,19 @@
#default proxy send charts matching = *
# Stream Compression
- #
- # The stream with the child can be configurated to enable stream compression.
+ # By default it is enabled.
# You can control stream compression in this parent agent stream with options: yes | no
#enable compression = yes
+ # Replication
+ # Enable replication for all hosts using this api key. Default: enabled
+ #enable replication = yes
+
+ # How many seconds to replicate from each child. Default: a day
+ #seconds to replicate = 86400
+
+ # The duration we want to replicate per each step.
+ #replication_step = 600
# -----------------------------------------------------------------------------
# 3. PER SENDING HOST SETTINGS, ON PARENT NETDATA
@@ -184,6 +192,11 @@
# you can give settings for each sending host here.
[MACHINE_GUID]
+ # This GUID is to be used as a MACHINE GUID from remote agents connecting
+ # to this machine, not an API key.
+ # YOU MUST SET THIS FIELD ON ALL MACHINE GUIDs.
+ type = machine
+
# enable this host: yes | no
# When disabled, the parent will not receive metrics for this host.
# THIS IS NOT A SECURITY MECHANISM - AN ATTACKER CAN SET ANY OTHER GUID.
@@ -197,14 +210,15 @@
# and at stream.conf [API_KEY].allow from
allow from = *
- # The number of entries in the database
- history = 3600
+ # The number of entries in the database.
+ # This is ignored for db mode dbengine.
+ #history = 3600
# The memory mode of the database: save | map | ram | none | dbengine
- memory mode = save
+ #memory mode = dbengine
# Health / alarms control: yes | no | auto
- health enabled = yes
+ #health enabled = auto
# postpone alarms when the sender connects
postpone alarms on connect seconds = 60
@@ -217,8 +231,16 @@
#proxy send charts matching = *
# Stream Compression
- #
- # The stream with the child can be configurated to enable stream compression.
+ # By default, enabled.
# You can control stream compression in this parent agent stream with options: yes | no
#enable compression = yes
- \ No newline at end of file
+
+ # Replication
+ # Enable replication for all hosts using this api key.
+ #enable replication = yes
+
+ # How many seconds to replicate from each child.
+ #seconds to replicate = 86400
+
+ # The duration we want to replicate per each step.
+ #replication_step = 600